Coverage for adhoc-cicd-odoo-odoo / odoo / tests / common.py: 27%
1587 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1# -*- coding: utf-8 -*-
2"""
3The module :mod:`odoo.tests.common` provides unittest test cases and a few
4helpers and classes to write tests.
6"""
7from __future__ import annotations
9import base64
10import binascii
11import concurrent.futures
12import contextlib
13import difflib
14import importlib
15import inspect
16import itertools
17import json
18import logging
19import os
20import pathlib
21import platform
22import pprint
23import psutil
24import re
25import shutil
26import signal
27import subprocess
28import sys
29import tempfile
30import threading
31import time
32import traceback
33import unittest
34import warnings
35from collections import defaultdict, deque
36from concurrent.futures import CancelledError, Future, InvalidStateError, wait
37from contextlib import contextmanager, ExitStack
38from copy import deepcopy
39from datetime import datetime
40from functools import lru_cache, partial, wraps
41from itertools import islice, zip_longest
42from textwrap import shorten
43from typing import Optional, Iterable, cast
44from unittest import TestResult
45from unittest.mock import patch, _patch, Mock
46from urllib.parse import parse_qsl, urlencode, urljoin, urlsplit, urlunsplit
47from xmlrpc import client as xmlrpclib
48from uuid import uuid4
49from werkzeug.exceptions import BadRequest
51import freezegun
52import requests
53from lxml import etree, html
54from passlib.context import CryptContext
55from requests import PreparedRequest, Session
57import odoo.addons.base
58import odoo.cli
59import odoo.http
60import odoo.models
61import odoo.orm.registry
62from odoo import api
63from odoo.exceptions import AccessError
64from odoo.fields import Command
65from odoo.modules.registry import Registry, DummyRLock
66from odoo.service import security
67from odoo.sql_db import Cursor, Savepoint
68from odoo.tools import config, float_compare, mute_logger, profiler, SQL, DotDict
69from odoo.tools.mail import single_email_re
70from odoo.tools.misc import find_in_path, lower_logging
71from odoo.tools.xml_utils import _validate_xml
72from odoo.addons.base.models import ir_actions_report
74from . import case, test_cursor
75from .result import OdooTestResult
77try:
78 import websocket
79except ImportError:
80 # chrome headless tests will be skipped
81 websocket = None
83_logger = logging.getLogger(__name__)
84if odoo.cli.COMMAND in ('server', 'start') and not config['test_enable']: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 _logger.error(
86 "Importing test framework"
87 ", avoid importing from business modules and when not running in test mode",
88 stack_info=True,
89 )
90else:
91 _logger.info("Importing test framework", stack_info=_logger.isEnabledFor(logging.DEBUG))
94# backward compatibility: Form was defined in this file
95def __getattr__(name):
96 # pylint: disable=import-outside-toplevel
97 if name != 'Form': 97 ↛ 100line 97 didn't jump to line 100 because the condition on line 97 was always true
98 raise AttributeError(name)
100 from .form import Form
102 warnings.warn(
103 "Since 18.0: odoo.tests.common.Form is deprecated, use odoo.tests.Form",
104 category=DeprecationWarning,
105 stacklevel=2,
106 )
107 return Form
110# The odoo library is supposed already configured.
111HOST = '127.0.0.1'
112# Useless constant, tests are aware of the content of demo data
113ADMIN_USER_ID = api.SUPERUSER_ID
115CHECK_BROWSER_SLEEP = 0.1 # seconds
116CHECK_BROWSER_ITERATIONS = 100
117BROWSER_WAIT = CHECK_BROWSER_SLEEP * CHECK_BROWSER_ITERATIONS # seconds
118DEFAULT_SUCCESS_SIGNAL = 'test successful'
119TEST_CURSOR_COOKIE_NAME = 'test_request_key'
121IGNORED_MSGS = re.compile(r"""
122 failed\ to\ fetch # base error
123 | connectionlosterror: # conversion by offlineFailToFetchErrorHandler
124 | assetsloadingerror: # lazy loaded bundle
125""", flags=re.VERBOSE | re.IGNORECASE).search
127def get_db_name():
128 dbnames = odoo.tools.config['db_name']
129 # If the database name is not provided on the command-line,
130 # use the one on the thread (which means if it is provided on
131 # the command-line, this will break when installing another
132 # database from XML-RPC).
133 if not dbnames and hasattr(threading.current_thread(), 'dbname'): 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 return threading.current_thread().dbname
135 if len(dbnames) > 1: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 sys.exit("-d/--database/db_name has multiple database, please provide a single one")
137 return dbnames[0]
140standalone_tests = defaultdict(list)
143class RegistryRLock(threading._RLock):
144 @property
145 def count(self):
146 return self._count # Expose private attribute
149# The lock should only be released when new test cursors are meant to be opened.
150# Further filtering on cursors can be done by extending `assertCanOpenTestCursor`.
151_registry_test_lock = RegistryRLock()
152_registry_test_lock.acquire()
155@contextmanager
156def release_test_lock():
157 """ Releases the test lock in a context manager, the lock is acquired once the context is over."""
158 try:
159 _registry_test_lock.release()
160 yield
161 finally:
162 if not _registry_test_lock.acquire(timeout=60):
163 tag = odoo.modules.module.current_test.canonical_tag
164 exit(f'Could not re-acquire the registry lock during {tag}, exiting...')
167def standalone(*tags):
168 """ Decorator for standalone test functions. This is somewhat dedicated to
169 tests that install, upgrade or uninstall some modules, which is currently
170 forbidden in regular test cases. The function is registered under the given
171 ``tags`` and the corresponding Odoo module name.
172 """
173 def register(func):
174 # register func by odoo module name
175 if func.__module__.startswith('odoo.addons.'): 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true
176 module = func.__module__.split('.')[2]
177 standalone_tests[module].append(func)
178 # register func with aribitrary name, if any
179 for tag in tags:
180 standalone_tests[tag].append(func)
181 standalone_tests['all'].append(func)
182 return func
184 return register
187def test_xsd(url=None, path=None, skip=False):
188 def decorator(func):
189 def wrapped_f(self, *args, **kwargs):
190 if not skip:
191 xmls = func(self, *args, **kwargs)
192 _validate_xml(self.env, url, path, xmls)
193 return wrapped_f
194 return decorator
197def new_test_user(env, login='', groups='base.group_user', context=None, **kwargs):
198 """ Helper function to create a new test user. It allows to quickly create
199 users given its login and groups (being a comma separated list of xml ids).
200 Kwargs are directly propagated to the create to further customize the
201 created user.
203 User creation uses a potentially customized environment using the context
204 parameter allowing to specify a custom context. It can be used to force a
205 specific behavior and/or simplify record creation. An example is to use
206 mail-related context keys in mail tests to speedup record creation.
208 Some specific fields are automatically filled to avoid issues
210 * group_ids: it is filled using groups function parameter;
211 * name: "login (groups)" by default as it is required;
212 * email: it is either the login (if it is a valid email) or a generated
213 string 'x.x@example.com' (x being the first login letter). This is due
214 to email being required for most odoo operations;
215 """
216 if not login: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 raise ValueError('New users require at least a login')
218 if not groups: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 raise ValueError('New users require at least user groups')
220 if context is None: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 context = {}
223 group_ids = [Command.set(kwargs.pop('group_ids', False) or [env.ref(g.strip()).id for g in groups.split(',')])]
224 create_values = dict(kwargs, login=login, group_ids=group_ids)
225 # automatically generate a name as "Login (groups)" to ease user comprehension
226 if not create_values.get('name'): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 create_values['name'] = '%s (%s)' % (login, groups)
228 # automatically give a password equal to login
229 if not create_values.get('password'): 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true
230 create_values['password'] = login + 'x' * (8 - len(login))
231 # generate email if not given as most test require an email
232 if 'email' not in create_values: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 if single_email_re.match(login):
234 create_values['email'] = login
235 else:
236 create_values['email'] = '%s.%s@example.com' % (login[0], login[0])
237 # ensure company_id + allowed company constraint works if not given at create
238 if 'company_id' in create_values and 'company_ids' not in create_values: 238 ↛ 241line 238 didn't jump to line 241 because the condition on line 238 was always true
239 create_values['company_ids'] = [(4, create_values['company_id'])]
241 return env['res.users'].with_context(**context).create(create_values)
243def loaded_demo_data(env):
244 return bool(env.ref('base.user_demo', raise_if_not_found=False))
246class RecordCapturer:
247 def __init__(self, model, domain=None):
248 self._model = model
249 self._domain = domain or []
251 def __enter__(self):
252 self._before = self._model.search(self._domain, order='id')
253 self._after = None
254 return self
256 def __exit__(self, exc_type, exc_value, exc_traceback):
257 if exc_type is None:
258 self._after = self._model.search(self._domain, order='id') - self._before
260 @property
261 def records(self):
262 if self._after is None:
263 return self._model.search(self._domain, order='id') - self._before
264 return self._after
267def _enter_context(cm, addcleanup):
268 # We look up the special methods on the type to match the with
269 # statement.
270 cls = type(cm)
271 try:
272 enter = cls.__enter__
273 exit = cls.__exit__
274 except AttributeError:
275 raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
276 f"not support the context manager protocol") from None
277 result = enter(cm)
278 addcleanup(exit, cm, None, None, None)
279 return result
282def _normalize_arch_for_assert(arch_string, parser_method="xml"):
283 """Takes some xml and normalize it to make it comparable to other xml
284 in particular, blank text is removed, and the output is pretty-printed
286 :param str arch_string: the string representing an XML arch
287 :param str parser_method: an string representing which lxml.Parser class to use
288 when normalizing both archs. Takes either "xml" or "html"
289 :return: the normalized arch
290 :rtype str:
291 """
292 Parser = None
293 if parser_method == 'xml':
294 Parser = etree.XMLParser
295 elif parser_method == 'html':
296 Parser = etree.HTMLParser
297 parser = Parser(remove_blank_text=True)
298 arch_string = etree.fromstring(arch_string, parser=parser)
299 return etree.tostring(arch_string, pretty_print=True, encoding='unicode')
301class BlockedRequest(requests.exceptions.ConnectionError):
302 pass
303_super_send = requests.Session.send
304class BaseCase(case.TestCase):
305 """ Subclass of TestCase for Odoo-specific code. This class is abstract and
306 expects self.registry, self.cr and self.uid to be initialized by subclasses.
307 """
308 registry: Registry = None
309 env: api.Environment = None
310 cr: Cursor = None
311 def __init_subclass__(cls):
312 """Assigns default test tags ``standard`` and ``at_install`` to test
313 cases not having them. Also sets a completely unnecessary
314 ``test_module`` attribute.
315 """
316 super().__init_subclass__()
317 if cls.__module__.startswith('odoo.addons.'):
318 if getattr(cls, 'test_tags', None) is None:
319 cls.test_tags = {'standard', 'at_install'}
320 cls.test_module = cls.__module__.split('.')[2]
322 longMessage = True # more verbose error message by default: https://www.odoo.com/r/Vmh
323 warm = True # False during warm-up phase (see :func:`warmup`)
324 _python_version = sys.version_info
326 _tests_run_count = int(os.environ.get('ODOO_TEST_FAILURE_RETRIES', 0)) + 1
328 _registry_patched = False
329 _registry_readonly_enabled = True
330 test_cursor_lock_timeout: int = 20
332 def __init__(self, methodName='runTest'):
333 super().__init__(methodName)
334 self.addTypeEqualityFunc(etree._Element, self.assertTreesEqual)
335 self.addTypeEqualityFunc(html.HtmlElement, self.assertTreesEqual)
336 if methodName != 'runTest': 336 ↛ exitline 336 didn't return from function '__init__' because the condition on line 336 was always true
337 self.test_tags = self.test_tags | set(self.get_method_additional_tags(getattr(self, methodName)))
340 @classmethod
341 def _request_handler(cls, s: Session, r: PreparedRequest, /, **kw):
342 # allow localhost requests
343 # TODO: also check port?
344 url = urlsplit(r.url)
345 timeout = kw.get('timeout')
346 if timeout and timeout < 10:
347 _logger.getChild('requests').info('request %s with timeout %s increased to 10s during tests', url, timeout)
348 kw['timeout'] = 10
349 if url.hostname in (HOST, 'localhost'):
350 return _super_send(s, r, **kw)
351 if url.scheme == 'file':
352 return _super_send(s, r, **kw)
354 _logger.getChild('requests').info(
355 "Blocking un-mocked external HTTP request %s %s", r.method, r.url)
356 raise BlockedRequest(f"External requests verboten (was {r.method} {r.url})")
358 def run(self, result: OdooTestResult) -> None:
359 testMethod = getattr(self, self._testMethodName)
361 if getattr(testMethod, '_retry', True) and getattr(self, '_retry', True): 361 ↛ 364line 361 didn't jump to line 364 because the condition on line 361 was always true
362 tests_run_count = self._tests_run_count
363 else:
364 tests_run_count = 1
365 _logger.info('Auto retry disabled for %s', self)
367 for retry in range(tests_run_count):
368 result.had_failure = False # reset in case of retry without soft_fail
369 if retry: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 _logger.runbot(f'Retrying a failed test: {self}')
371 if retry < tests_run_count-1: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 with warnings.catch_warnings(), \
373 result.soft_fail(), \
374 lower_logging(25, logging.INFO) as quiet_log:
375 super().run(cast(TestResult, result))
376 if not (result.had_failure or quiet_log.had_error_log):
377 break
378 else: # last try
379 super().run(cast(TestResult, result))
380 if not result.wasSuccessful() and BaseCase._tests_run_count != 1: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true
381 _logger.runbot('Disabling auto-retry after a failed test')
382 BaseCase._tests_run_count = 1
384 @classmethod
385 def setUpClass(cls):
386 def check_remaining_processes():
387 current_process = psutil.Process()
388 children = current_process.children(recursive=False)
389 for child in children: 389 ↛ 390line 389 didn't jump to line 390 because the loop on line 389 never started
390 _logger.warning('A child process was found, terminating it: %s', child)
391 child.terminate()
392 psutil.wait_procs(children, timeout=10) # mainly to avoid a zombie process that would be logged again at the end.
393 cls.addClassCleanup(check_remaining_processes)
395 def check_remaining_patchers():
396 for patcher in _patch._active_patches: 396 ↛ 397line 396 didn't jump to line 397 because the loop on line 396 never started
397 _logger.warning("A patcher (targeting %s.%s) was remaining active at the end of %s, disabling it...", patcher.target, patcher.attribute, cls.__name__)
398 patcher.stop()
399 cls.addClassCleanup(check_remaining_patchers)
400 super().setUpClass()
401 if 'standard' in cls.test_tags or 'click_all' in cls.test_tags: 401 ↛ exitline 401 didn't return from function 'setUpClass' because the condition on line 401 was always true
402 # if the method is passed directly `patch` discards the session
403 # object which we need
404 # pylint: disable=unnecessary-lambda
405 patcher = patch.object(
406 requests.sessions.Session,
407 'send',
408 lambda s, r, **kwargs: cls._request_handler(s, r, **kwargs),
409 )
410 patcher.start()
411 cls.addClassCleanup(patcher.stop)
413 def setUp(self):
414 super().setUp()
415 self.http_request_key: str = ''
416 self.http_request_allow_all: bool = False
418 def cursor(self):
419 return self.registry.cursor()
421 @property
422 def uid(self):
423 """ Get the current uid. """
424 return self.env.uid
426 @uid.setter
427 def uid(self, user):
428 """ Set the uid by changing the test's environment. """
429 self.env = self.env(user=user)
430 # set the updated environment as the default one
431 self.env.transaction.default_env = self.env
433 def ref(self, xid):
434 """ Returns database ID for the provided :term:`external identifier`,
435 shortcut for ``_xmlid_lookup``
437 :param xid: fully-qualified :term:`external identifier`, in the form
438 :samp:`{module}.{identifier}`
439 :raise: ValueError if not found
440 :returns: registered id
441 """
442 return self.browse_ref(xid).id
444 def browse_ref(self, xid):
445 """ Returns a record object for the provided
446 :term:`external identifier`
448 :param xid: fully-qualified :term:`external identifier`, in the form
449 :samp:`{module}.{identifier}`
450 :raise: ValueError if not found
451 :returns: :class:`~odoo.models.BaseModel`
452 """
453 assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
454 return self.env.ref(xid)
456 def patch(self, obj, key, val):
457 """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """
458 patcher = patch.object(obj, key, val) # this is unittest.mock.patch
459 patcher.start()
460 self.addCleanup(patcher.stop)
462 @classmethod
463 def classPatch(cls, obj, key, val):
464 """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """
465 patcher = patch.object(obj, key, val) # this is unittest.mock.patch
466 patcher.start()
467 cls.addClassCleanup(patcher.stop)
469 def startPatcher(self, patcher):
470 mock = patcher.start()
471 self.addCleanup(patcher.stop)
472 return mock
474 @classmethod
475 def startClassPatcher(cls, patcher):
476 mock = patcher.start()
477 cls.addClassCleanup(patcher.stop)
478 return mock
480 def enterContext(self, cm):
481 """Enters the supplied context manager.
483 If successful, also adds its __exit__ method as a cleanup
484 function and returns the result of the __enter__ method.
485 """
486 return _enter_context(cm, self.addCleanup)
488 @classmethod
489 def enterClassContext(cls, cm):
490 """Same as enterContext, but class-wide."""
491 return _enter_context(cm, cls.addClassCleanup)
493 @contextmanager
494 def with_user(self, login):
495 """ Change user for a given test, like with self.with_user() ... """
496 old_uid = self.uid
497 old_env = self.env
498 try:
499 user = self.env['res.users'].sudo().search([('login', '=', login)])
500 assert user, "Login %s not found" % login
501 # switch user
502 self.uid = user.id
503 self.env = self.env(user=self.uid)
504 yield
505 finally:
506 # back
507 self.uid = old_uid
508 self.env = old_env
510 @contextmanager
511 def debug_mode(self):
512 """ Enable the effects of debug mode (in particular for group ``base.group_no_one``). """
513 request = Mock(
514 httprequest=Mock(host='localhost'),
515 db=self.env.cr.dbname,
516 env=self.env,
517 session=DotDict(odoo.http.get_default_session(), debug='1'),
518 )
519 try:
520 self.env.flush_all()
521 self.env.invalidate_all()
522 odoo.http._request_stack.push(request)
523 yield
524 self.env.flush_all()
525 self.env.invalidate_all()
526 finally:
527 popped_request = odoo.http._request_stack.pop()
528 if popped_request is not request:
529 raise Exception('Wrong request stack cleanup.')
531 @contextmanager
532 def _assertRaises(self, exception, *, msg=None):
533 """ Context manager that clears the environment upon failure. """
534 with ExitStack() as init:
535 if self.env: 535 ↛ 544line 535 didn't jump to line 544 because the condition on line 535 was always true
536 init.enter_context(self.env.cr.savepoint())
537 if issubclass(exception, AccessError): 537 ↛ 542line 537 didn't jump to line 542 because the condition on line 537 was never true
538 # The savepoint() above calls flush(), which leaves the
539 # record cache with lots of data. This can prevent
540 # access errors to be detected. In order to avoid this
541 # issue, we clear the cache before proceeding.
542 self.env.cr.clear()
544 with ExitStack() as inner:
545 cm = inner.enter_context(super().assertRaises(exception, msg=msg))
546 # *moves* the cleanups from init to inner, this ensures the
547 # savepoint gets rolled back when `yield` raises `exception`,
548 # but still allows the initialisation to be protected *and* not
549 # interfered with by `assertRaises`.
550 inner.push(init.pop_all())
552 yield cm
554 def assertRaises(self, exception, func=None, *args, **kwargs):
555 if func: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true
556 with self._assertRaises(exception):
557 func(*args, **kwargs)
558 else:
559 return self._assertRaises(exception, **kwargs)
561 def _patchExecute(self, actual_queries, flush=True):
562 Cursor_execute = Cursor.execute
564 def execute(self, query, params=None, log_exceptions=None):
565 actual_queries.append(query.code if isinstance(query, SQL) else query)
566 return Cursor_execute(self, query, params, log_exceptions)
568 if flush:
569 self.env.flush_all()
570 self.env.cr.flush()
572 with (
573 patch('odoo.sql_db.Cursor.execute', execute),
574 patch.object(self.env.registry, 'unaccent', lambda x: x),
575 ):
576 yield actual_queries
577 if flush:
578 self.env.flush_all()
579 self.env.cr.flush()
581 @contextmanager
582 def assertQueries(self, expected, flush=True):
583 """ Check the queries made by the current cursor. ``expected`` is a list
584 of strings representing the expected queries being made. Query strings
585 are matched against each other, ignoring case and whitespaces.
586 """
587 actual_queries = []
589 yield from self._patchExecute(actual_queries, flush)
591 if not self.warm:
592 return
594 self.assertEqual(
595 len(actual_queries), len(expected),
596 "\n---- actual queries:\n%s\n---- expected queries:\n%s" % (
597 "\n".join(actual_queries), "\n".join(expected),
598 )
599 )
600 for actual_query, expect_query in zip(actual_queries, expected):
601 self.assertEqual(
602 "".join(actual_query.lower().split()),
603 "".join(expect_query.lower().split()),
604 "\n---- actual query:\n%s\n---- not like:\n%s" % (actual_query, expect_query),
605 )
607 @contextmanager
608 def assertQueriesContain(self, expected, flush=True):
609 """ Check the queries made by the current cursor. ``expected`` is a list
610 of strings representing the expected queries being made. Query strings
611 are matched against each other, ignoring case and whitespaces.
612 """
613 actual_queries = []
615 yield from self._patchExecute(actual_queries, flush)
617 if not self.warm:
618 return
620 self.assertEqual(
621 len(actual_queries), len(expected),
622 "\n---- actual queries:\n%s\n---- expected queries:\n%s" % (
623 "\n".join(actual_queries), "\n".join(expected),
624 )
625 )
626 for actual_query, expect_query in zip(actual_queries, expected):
627 self.assertIn(
628 "".join(expect_query.lower().split()),
629 "".join(actual_query.lower().split()),
630 "\n---- actual query:\n%s\n---- doesn't contain:\n%s" % (actual_query, expect_query),
631 )
633 @contextmanager
634 def assertQueryCount(self, default=0, flush=True, **counters):
635 """ Context manager that counts queries. It may be invoked either with
636 one value, or with a set of named arguments like ``login=value``::
638 with self.assertQueryCount(42):
639 ...
641 with self.assertQueryCount(admin=3, demo=5):
642 ...
644 The second form is convenient when used with :func:`users`.
645 """
646 if self.warm:
647 # mock random in order to avoid random bus gc
648 with patch('random.random', lambda: 1):
649 login = self.env.user.login
650 expected = counters.get(login, default)
651 if flush:
652 self.env.flush_all()
653 self.env.cr.flush()
654 count0 = self.cr.sql_log_count
655 yield
656 if flush:
657 self.env.flush_all()
658 self.env.cr.flush()
659 count = self.cr.sql_log_count - count0
660 if count != expected:
661 # add some info on caller to allow semi-automatic update of query count
662 _frame, filename, linenum, funcname, _lines, _index = inspect.stack()[2]
663 filename = filename.replace('\\', '/')
664 if "/odoo/addons/" in filename:
665 filename = filename.rsplit("/odoo/addons/", 1)[1]
666 if count > expected:
667 msg = "Query count more than expected for user %s: %d > %d in %s at %s:%s"
668 # add a subtest in order to continue the test_method in case of failures
669 with self.subTest():
670 self.fail(msg % (login, count, expected, funcname, filename, linenum))
671 else:
672 logger = logging.getLogger(type(self).__module__)
673 msg = "Query count less than expected for user %s: %d < %d in %s at %s:%s"
674 logger.info(msg, login, count, expected, funcname, filename, linenum)
675 else:
676 # flush before and after during warmup, in order to reproduce the
677 # same operations, otherwise the caches might not be ready!
678 if flush:
679 self.env.flush_all()
680 self.env.cr.flush()
681 yield
682 if flush:
683 self.env.flush_all()
684 self.env.cr.flush()
686 def assertRecordValues(
687 self,
688 records: odoo.models.BaseModel,
689 expected_values: list[dict],
690 *,
691 field_names: Optional[Iterable[str]] = None,
692 ) -> None:
693 ''' Compare a recordset with a list of dictionaries representing the expected results.
694 This method performs a comparison element by element based on their index.
695 Then, the order of the expected values is extremely important.
697 .. note::
699 - ``None`` expected values can be used for empty fields.
700 - x2many fields are expected by ids (so the expected value should be
701 a ``list[int]``
702 - many2one fields are expected by id (so the expected value should
703 be an ``int``
705 :param records: The records to compare.
706 :param expected_values: Items to check the ``records`` against.
707 :param field_names: list of fields to check during comparison, if
708 unspecified all expected_values must have the same
709 keys and all are checked
710 '''
711 if not field_names:
712 field_names = expected_values[0].keys()
713 for i, v in enumerate(expected_values):
714 self.assertEqual(
715 v.keys(), field_names,
716 f"All expected values must have the same keys, found differences between records 0 and {i}",
717 )
719 expected_reformatted = []
720 for vs in expected_values:
721 r = {}
722 for f in field_names:
723 t = records._fields[f].type
724 if t in ('one2many', 'many2many'):
725 r[f] = sorted(vs[f])
726 elif t == 'float':
727 r[f] = float(vs[f])
728 elif t == 'integer':
729 r[f] = int(vs[f])
730 elif vs[f] is None:
731 r[f] = False
732 else:
733 r[f] = vs[f]
734 expected_reformatted.append(r)
736 record_reformatted = []
737 for record in records:
738 r = {}
739 for field_name in field_names:
740 record_value = record[field_name]
741 match record._fields[field_name]:
742 case odoo.fields.Many2one():
743 record_value = record_value.id
744 case odoo.fields.One2many() | odoo.fields.Many2many():
745 record_value = sorted(record_value.ids)
746 case odoo.fields.Float() as field if digits := field.get_digits(record.env):
747 record_value = Approx(record_value, digits[1], decorate=False)
748 case odoo.fields.Monetary() as field if currency_field_name := field.get_currency_field(record):
749 # don't round if there's no currency set
750 if c := record[currency_field_name]:
751 record_value = Approx(record_value, c, decorate=False)
753 r[field_name] = record_value
754 record_reformatted.append(r)
756 try:
757 self.assertSequenceEqual(expected_reformatted, record_reformatted, seq_type=list)
758 return
759 except AssertionError as e:
760 standardMsg, _, diffMsg = str(e).rpartition('\n')
761 if 'self.maxDiff' not in diffMsg:
762 raise
763 # move out of handler to avoid exception chaining
765 diffMsg = "".join(difflib.unified_diff(
766 pprint.pformat(expected_reformatted).splitlines(keepends=True),
767 pprint.pformat(record_reformatted).splitlines(keepends=True),
768 fromfile="expected", tofile="records",
769 ))
770 self.fail(self._formatMessage(None, standardMsg + '\n' + diffMsg))
772 # turns out this thing may not be quite as useful as we thought...
773 def assertItemsEqual(self, a, b, msg=None):
774 self.assertCountEqual(a, b, msg=None)
776 def assertTreesEqual(self, n1, n2, msg=None):
777 self.assertIsNotNone(n1, msg)
778 self.assertIsNotNone(n2, msg)
779 self.assertEqual(n1.tag, n2.tag, msg)
780 # Because lxml.attrib is an ordereddict for which order is important
781 # to equality, even though *we* don't care
782 self.assertEqual(dict(n1.attrib), dict(n2.attrib), msg)
783 self.assertEqual((n1.text or u'').strip(), (n2.text or u'').strip(), msg)
784 self.assertEqual((n1.tail or u'').strip(), (n2.tail or u'').strip(), msg)
786 for c1, c2 in zip_longest(n1, n2):
787 self.assertTreesEqual(c1, c2, msg)
789 def _assertXMLEqual(self, original, expected, parser="xml"):
790 """Asserts that two xmls archs are equal
792 :param original: the xml arch to test
793 :type original: str
794 :param expected: the xml arch of reference
795 :type expected: str
796 :param parser: an string representing which lxml.Parser class to use
797 when normalizing both archs. Takes either "xml" or "html"
798 :type parser: str
799 """
800 self.maxDiff = 10000
801 if original:
802 original = _normalize_arch_for_assert(original, parser)
803 if expected:
804 expected = _normalize_arch_for_assert(expected, parser)
805 self.assertEqual(original, expected)
807 def assertXMLEqual(self, original, expected):
808 return self._assertXMLEqual(original, expected)
810 def assertHTMLEqual(self, original, expected):
811 return self._assertXMLEqual(original, expected, 'html')
813 def profile(self, description='', **kwargs):
814 test_method = getattr(self, '_testMethodName', 'Unknown test method')
815 if not hasattr(self, 'profile_session'):
816 self.profile_session = profiler.make_session(test_method)
817 if 'db' not in kwargs:
818 kwargs['db'] = self.env.cr.dbname
819 return profiler.Profiler(
820 description='%s uid:%s %s %s' % (test_method, self.env.user.id, 'warm' if self.warm else 'cold', description),
821 profile_session=self.profile_session,
822 **kwargs)
824 @classmethod
825 def _registry_test_mode_patches(cls, *, cr: Cursor, registry: Registry):
826 """
827 Returns the patches required for entering registry test mode.
828 The patches are not started.
829 """
830 def _patched_cursor(readonly: bool = False):
831 return test_cursor.TestCursor(
832 cr, _registry_test_lock, readonly and cls._registry_readonly_enabled
833 )
834 return [
835 # New cursor should point to the test's cursor
836 patch.object(registry, 'cursor', _patched_cursor),
837 # Disable locking and signaling
838 patch.object(Registry, '_lock', DummyRLock()),
839 patch.object(registry, 'setup_signaling', return_value=None), #noop
840 patch.object(registry, 'check_signaling', return_value=registry),
841 ]
843 @classmethod
844 def registry_enter_test_mode_cls(cls):
845 """
846 Puts the registry in test mode.
848 New cursors returned by the registry will be instances of `TestCursor`
849 which will wrap the current cursor.
850 """
851 assert not cls._registry_patched, 'Can only patch registry once'
852 assert cls.cr, 'No cursor'
853 assert cls.registry, 'No registry'
855 cls.registry_patches = cls._registry_test_mode_patches(
856 cr=cls.cr, registry=cls.registry,
857 )
858 for p in cls.registry_patches:
859 p.start()
860 cls._registry_patched = True
861 cls.addClassCleanup(cls.registry_leave_test_mode)
863 def registry_enter_test_mode(self, *, cr: Cursor | None = None, register_cleanup: bool = True) -> None:
864 """
865 Puts the registry in test mode.
867 New cursors returned by the registry will be instances of `TestCursor`
868 which will wrap the current cursor.
870 :param cr: the cursor to wrap (defaults to the current cursor if none)
871 :param register_cleanup: whether to register cleanup.
872 """
873 assert not type(self)._registry_patched, 'Can only patch registry once'
874 assert cr or self.cr, 'No cursor'
875 assert self.registry, 'No registry'
877 type(self).registry_patches = self._registry_test_mode_patches(
878 cr=cr or self.cr, registry=self.registry,
879 )
880 for p in self.registry_patches:
881 p.start()
882 type(self)._registry_patched = True
883 if register_cleanup:
884 self.addCleanup(self.registry_leave_test_mode)
886 @classmethod
887 def registry_leave_test_mode(cls):
888 assert cls._registry_patched, 'Registry is not patched'
890 for p in cls.registry_patches:
891 p.stop()
892 cls.registry_patches.clear()
893 cls._registry_patched = False
895 @classmethod
896 def set_registry_readonly_mode(cls, enabled: bool):
897 assert cls._registry_patched, 'Registry is not patched'
899 cls._registry_readonly_enabled = enabled
901 def assertCanOpenTestCursor(self):
902 """ Asserts that we can currently open a test cursor. """
903 if odoo.modules.module.current_test != self:
904 message = f"Trying to open a test cursor for {self.canonical_tag} while already in a test {odoo.modules.module.current_test.canonical_tag}"
905 _logger.runbot(message)
906 raise BadRequest(message)
907 request = odoo.http.request
908 if not request or self.http_request_allow_all:
909 return
910 http_request_required_key = self.http_request_key
911 http_request_key = request.cookies.get(TEST_CURSOR_COOKIE_NAME)
912 if http_request_key != http_request_required_key:
913 expected = http_request_required_key
914 if not expected:
915 expected = 'None (request are not enabled)'
916 _logger.runbot(
917 'Request with path %s has been ignored during test as it '
918 'it does not contain the test_cursor cookie or it is expired.'
919 ' (required "%s", got "%s")',
920 request.httprequest.path, expected, http_request_key
921 )
922 raise BadRequest(
923 'Request ignored during test as it does not contain the required cookie.'
924 )
926 def get_method_additional_tags(self, test_method):
927 """Guess if the test_methods is a query_count and adds an `is_query_count` tag on the test
928 """
929 additional_tags = []
930 if odoo.tools.config['test_tags'] and 'is_query_count' in odoo.tools.config['test_tags']: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true
931 method_source = inspect.getsource(test_method) if test_method else ''
932 if 'self.assertQueryCount' in method_source:
933 additional_tags.append('is_query_count')
934 return additional_tags
936class Like:
937 """
938 A string-like object comparable to other strings but where the substring
939 '...' can match anything in the other string.
941 Example of usage:
943 self.assertEqual("SELECT field1, field2, field3 FROM model", Like('SELECT ... FROM model'))
944 self.assertIn(Like('Company ... (SF)'), ['TestPartner', 'Company 8 (SF)', 'SomeAdress'])
945 self.assertEqual([
946 'TestPartner',
947 'Company 8 (SF)',
948 'Anything else'
949 ], [
950 'TestPartner',
951 Like('Company ... (SF)'),
952 Like('...'),
953 ])
955 In case of mismatch, here is an example of error message
957 AssertionError: Lists differ: ['TestPartner', 'Company 8 (LA)', 'Anything else'] != ['TestPartner', ~Company ... (SF), ~...]
959 First differing element 1:
960 'Company 8 (LA)'
961 ~Company ... (SF)~
963 - ['TestPartner', 'Company 8 (LA)', 'Anything else']
964 + ['TestPartner', ~Company ... (SF), ~...]
967 """
968 def __init__(self, pattern):
969 self.pattern = pattern
970 self.regex = '.*'.join([re.escape(part.strip()) for part in self.pattern.split('...')])
972 def __eq__(self, other):
973 return re.fullmatch(self.regex, other.strip(), re.DOTALL)
975 def __repr__(self):
976 return repr(self.pattern)
979class WhitespaceInsensitive(str):
980 __slots__ = ()
982 def __hash__(self):
983 return hash(re.sub(r'\s+', ' ', self))
985 def __eq__(self, other):
986 if not isinstance(other, str):
987 return NotImplemented
988 return re.sub(r'\s+', ' ', self) == re.sub(r'\s+', ' ', other)
991class Approx: # noqa: PLW1641
992 """A wrapper for approximate float comparisons. Uses float_compare under
993 the hood.
995 Most of the time, :meth:`TestCase.assertAlmostEqual` is more useful, but it
996 doesn't work for all helpers.
997 """
998 def __init__(self, value: float, rounding: int | float | odoo.addons.base.models.res_currency.ResCurrency, /, decorate: bool) -> None: # noqa: PYI041
999 self.value = value
1000 self.decorate = decorate
1001 if isinstance(rounding, int):
1002 self.cmp = partial(float_compare, precision_digits=rounding)
1003 elif isinstance(rounding, float):
1004 self.cmp = partial(float_compare, precision_rounding=rounding)
1005 else:
1006 self.cmp = rounding.compare_amounts
1008 def __repr__(self) -> str:
1009 if self.decorate:
1010 return f"~{self.value!r}"
1011 return repr(self.value)
1013 def __eq__(self, other: object) -> bool | NotImplemented:
1014 if not isinstance(other, (float, int)):
1015 return NotImplemented
1016 return self.cmp(self.value, other) == 0
1020class TransactionCase(BaseCase):
1021 """ Test class in which all test methods are run in a single transaction,
1022 but each test method is run in a sub-transaction managed by a savepoint.
1023 The transaction's cursor is always closed without committing.
1025 The data setup common to all methods should be done in the class method
1026 `setUpClass`, so that it is done once for all test methods. This is useful
1027 for test cases containing fast tests but with significant database setup
1028 common to all cases (complex in-db test data).
1030 After being run, each test method cleans up the record cache and the
1031 registry cache. However, there is no cleanup of the registry models and
1032 fields. If a test modifies the registry (custom models and/or fields), it
1033 should prepare the necessary cleanup (`self.registry.reset_changes()`).
1034 """
1035 muted_registry_logger = mute_logger(odoo.orm.registry._logger.name)
1036 freeze_time = None
1038 @classmethod
1039 def _gc_filestore(cls):
1040 # attachment can be created or unlink during the tests.
1041 # they can addup during test and take some disc space.
1042 # since cron are not running during tests, we need to gc manually
1043 # We need to check the status of the file system outside of the test cursor
1044 with Registry(get_db_name()).cursor() as cr:
1045 gc_env = api.Environment(cr, api.SUPERUSER_ID, {})
1046 gc_env['ir.attachment']._gc_file_store_unsafe()
1048 @classmethod
1049 def setUpClass(cls):
1050 super().setUpClass()
1051 cls.addClassCleanup(cls._gc_filestore)
1052 cls.registry = Registry(get_db_name())
1053 cls.registry_start_invalidated = cls.registry.registry_invalidated
1054 cls.registry_start_sequence = cls.registry.registry_sequence
1055 cls.registry_cache_sequences = dict(cls.registry.cache_sequences)
1057 def reset_changes():
1058 if (cls.registry_start_sequence != cls.registry.registry_sequence) or cls.registry.registry_invalidated: 1058 ↛ 1061line 1058 didn't jump to line 1061 because the condition on line 1058 was always true
1059 with cls.registry.cursor() as cr:
1060 cls.registry._setup_models__(cr)
1061 cls.registry.registry_invalidated = cls.registry_start_invalidated
1062 cls.registry.registry_sequence = cls.registry_start_sequence
1063 with cls.muted_registry_logger:
1064 cls.registry.clear_all_caches()
1065 cls.registry.cache_invalidated.clear()
1066 cls.registry.cache_sequences = cls.registry_cache_sequences
1067 cls.addClassCleanup(reset_changes)
1069 def signal_changes():
1070 if not cls.registry.ready:
1071 _logger.info('Skipping signal changes during tests')
1072 return
1073 if cls.registry.registry_invalidated or cls.registry.cache_invalidated:
1074 _logger.info('Simulating signal changes during tests')
1075 if cls.registry.registry_invalidated:
1076 cls.registry.registry_sequence += 1
1077 for cache_name in cls.registry.cache_invalidated or ():
1078 cls.registry.cache_sequences[cache_name] += 1
1079 cls.registry.registry_invalidated = False
1080 cls.registry.cache_invalidated.clear()
1082 cls._signal_changes_patcher = patch.object(cls.registry, 'signal_changes', signal_changes)
1083 cls.startClassPatcher(cls._signal_changes_patcher)
1085 cls.cr = cls.registry.cursor()
1086 cls.addClassCleanup(cast(Cursor, cls.cr).close)
1088 def check_cursor_stack():
1089 for cursor in test_cursor.TestCursor._cursors_stack: 1089 ↛ 1090line 1089 didn't jump to line 1090 because the loop on line 1089 never started
1090 _logger.info('One curor was remaining in the TestCursor stack at the end of the test')
1091 cursor._closed = True
1092 test_cursor.TestCursor._cursors_stack = []
1094 cls.addClassCleanup(check_cursor_stack)
1096 if cls.freeze_time: 1096 ↛ 1097line 1096 didn't jump to line 1097 because the condition on line 1096 was never true
1097 cls.startClassPatcher(cls.freeze_time)
1099 def forbidden(*args, **kwars):
1100 traceback.print_stack()
1101 raise AssertionError('Cannot commit or rollback a cursor from inside a test, this will lead to a broken cursor when trying to rollback the test. Please rollback to a specific savepoint instead or open another cursor if really necessary')
1103 cls.commit_patcher = patch.object(cls.cr, 'commit', forbidden)
1104 cls.startClassPatcher(cls.commit_patcher)
1105 cls.rollback_patcher = patch.object(cls.cr, 'rollback', forbidden)
1106 cls.startClassPatcher(cls.rollback_patcher)
1107 cls.close_patcher = patch.object(cls.cr, 'close', forbidden)
1108 cls.startClassPatcher(cls.close_patcher)
1110 cls.env = api.Environment(cls.cr, api.SUPERUSER_ID, {})
1112 # speedup CryptContext. Many user an password are done during tests, avoid spending time hasing password with many rounds
1113 def _crypt_context(self): # noqa: ARG001
1114 return CryptContext(
1115 ['pbkdf2_sha512', 'plaintext'],
1116 pbkdf2_sha512__rounds=1,
1117 )
1118 cls._crypt_context_patcher = patch('odoo.addons.base.models.res_users.ResUsersPatchedInTest._crypt_context', _crypt_context)
1119 cls.startClassPatcher(cls._crypt_context_patcher)
1121 def setUp(self):
1122 super().setUp()
1124 def _check_registry_lock():
1125 if _registry_test_lock.count == 0: 1125 ↛ 1126line 1125 didn't jump to line 1126 because the condition on line 1125 was never true
1126 _logger.warning('The registry test lock is still released at the end of %s', self.canonical_tag)
1127 elif _registry_test_lock.count > 1: 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true
1128 _logger.warning(
1129 'The registry test lock was acquired more than once (%s) at the end of %s',
1130 _registry_test_lock.count, self.canonical_tag,
1131 )
1133 self.addCleanup(_check_registry_lock)
1134 # restore environments after the test to avoid invoking flush() with an
1135 # invalid environment (inexistent user id) from another test
1136 envs = self.env.transaction.envs
1137 for env in list(envs):
1138 self.addCleanup(env.clear)
1139 # restore the set of known environments as it was at setUp
1140 self.addCleanup(envs.update, list(envs))
1141 self.addCleanup(envs.clear)
1143 self.addCleanup(self.muted_registry_logger(self.registry.clear_all_caches))
1145 # This prevents precommit functions and data from piling up
1146 # until cr.flush is called in 'assertRaises' clauses
1147 # (these are not cleared in self.env.clear or envs.clear)
1148 cr = self.env.cr
1150 def _reset(cb, funcs, data):
1151 cb._funcs = funcs
1152 cb.data = data
1153 for callback in [cr.precommit, cr.postcommit, cr.prerollback, cr.postrollback]:
1154 self.addCleanup(_reset, callback, deque(callback._funcs), deepcopy(callback.data))
1156 # flush everything in setUpClass before introducing a savepoint
1157 self.env.flush_all()
1159 savepoint = Savepoint(self.cr)
1160 self.addCleanup(savepoint.close)
1162 @contextmanager
1163 def enter_registry_test_mode(self):
1164 """
1165 Make so that all new cursors opened on this database registry reuse the
1166 one currenly used by the tests. See ``registry_enter_test_mode``.
1167 """
1168 # entering the test mode should flush/invalidate all changes in the
1169 # current environment because changes happen inside other cursors
1170 env = self.env
1171 env.flush_all()
1172 self.registry_enter_test_mode(register_cleanup=False)
1173 try:
1174 yield
1175 finally:
1176 self.registry_leave_test_mode()
1177 env.invalidate_all()
1179 @contextmanager
1180 def allow_pdf_render(self):
1181 """
1182 Allows wkhtmltopdf to send requests to the backend.
1183 Enters registry mode if necessary.
1184 """
1185 with ExitStack() as stack:
1186 if not type(self)._registry_patched:
1187 stack.enter_context(self.enter_registry_test_mode())
1188 old_run_wkhtmltopdf = ir_actions_report._run_wkhtmltopdf
1190 def _patched_run_wkhtmltopdf(args):
1191 with patch.object(self, 'http_request_key', 'wkhtmltopdf'), release_test_lock():
1192 args = ['--cookie', TEST_CURSOR_COOKIE_NAME, 'wkhtmltopdf', *args]
1193 return old_run_wkhtmltopdf(args)
1195 stack.enter_context(
1196 patch.object(ir_actions_report, '_run_wkhtmltopdf', _patched_run_wkhtmltopdf)
1197 )
1198 yield
1201class SingleTransactionCase(BaseCase):
1202 """ TestCase in which all test methods are run in the same transaction,
1203 the transaction is started with the first test method and rolled back at
1204 the end of the last.
1205 """
1206 @classmethod
1207 def __init_subclass__(cls):
1208 super().__init_subclass__()
1209 if issubclass(cls, TransactionCase): 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true
1210 _logger.warning("%s inherits from both TransactionCase and SingleTransactionCase")
1212 @classmethod
1213 def setUpClass(cls):
1214 super().setUpClass()
1215 cls.registry = Registry(get_db_name())
1216 cls.addClassCleanup(cls.registry.reset_changes)
1217 cls.addClassCleanup(cls.registry.clear_all_caches)
1219 cls.cr = cls.registry.cursor()
1220 cls.addClassCleanup(cast(Cursor, cls.cr).close)
1222 cls.env = api.Environment(cls.cr, api.SUPERUSER_ID, {})
1224 def setUp(self):
1225 super(SingleTransactionCase, self).setUp()
1226 self.env.flush_all()
1229class ChromeBrowserException(Exception):
1230 pass
1232def run(gen_func):
1233 def done(f):
1234 try:
1235 try:
1236 r = f.result()
1237 except Exception as e:
1238 f = coro.throw(e)
1239 else:
1240 f = coro.send(r)
1241 except StopIteration:
1242 return
1244 assert isinstance(f, Future), f"coroutine must yield futures, got {f}"
1245 f.add_done_callback(done)
1247 coro = gen_func()
1248 try:
1249 next(coro).add_done_callback(done)
1250 except StopIteration:
1251 return
1253def save_test_file(test_name, content, prefix, extension='png', logger=_logger, document_type='Screenshot', date_format="%Y%m%d_%H%M%S_%f"):
1254 assert re.fullmatch(r'\w*_', prefix)
1255 assert re.fullmatch(r'[a-z]+', extension)
1256 assert re.fullmatch(r'\w+', test_name)
1257 now = datetime.now().strftime(date_format)
1258 screenshots_dir = pathlib.Path(odoo.tools.config['screenshots']) / get_db_name() / 'screenshots'
1259 screenshots_dir.mkdir(parents=True, exist_ok=True)
1260 full_path = screenshots_dir / f'{prefix}{now}_{test_name}.{extension}'
1261 full_path.write_bytes(content)
1262 logger.runbot(f'{document_type} in: {full_path}')
1265if os.name == 'posix' and platform.system() != 'Darwin': 1265 ↛ 1273line 1265 didn't jump to line 1273 because the condition on line 1265 was always true
1266 # since the introduction of pointer compression in Chrome 80 (v8 v8.0),
1267 # the memory reservation algorithm requires more than 8GiB of
1268 # virtual mem for alignment this exceeds our default memory limits.
1269 def _preexec():
1270 import resource # noqa: PLC0415
1271 resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
1272else:
1273 _preexec = None
1276class ChromeBrowser:
1277 """ Helper object to control a Chrome headless process. """
1278 remote_debugging_port = 0 # 9222, change it in a non-git-tracked file
1280 def __init__(self, test_case: HttpCase, success_signal: str = DEFAULT_SUCCESS_SIGNAL, headless: bool = True, debug: bool = False):
1281 self.throttling_factor = 1
1282 self._logger = test_case._logger
1283 self.test_case = test_case
1284 self.success_signal = success_signal
1285 if websocket is None:
1286 self._logger.warning("websocket-client module is not installed")
1287 raise unittest.SkipTest("websocket-client module is not installed")
1288 self.user_data_dir = tempfile.mkdtemp(suffix='_chrome_odoo')
1290 if scs := odoo.tools.config['screencasts']:
1291 self.screencaster = Screencaster(self, scs)
1292 else:
1293 self.screencaster = NoScreencast()
1295 if os.name == 'posix':
1296 self.sigxcpu_handler = signal.getsignal(signal.SIGXCPU)
1297 signal.signal(signal.SIGXCPU, self.signal_handler)
1298 else:
1299 self.sigxcpu_handler = None
1301 test_case.browser_size = test_case.browser_size.replace('x', ',')
1303 self.chrome, self.devtools_port = self._chrome_start(
1304 user_data_dir=self.user_data_dir,
1305 touch_enabled=test_case.touch_enabled,
1306 headless=headless,
1307 debug=debug,
1308 )
1309 self.ws = self._open_websocket()
1310 self._request_id = itertools.count()
1311 self._result = Future()
1312 self.error_checker = None
1313 self.had_failure = False
1314 # maps request_id to Futures
1315 self._responses = {}
1316 # maps frame ids to callbacks
1317 self._frames = {}
1318 self._handlers = {
1319 'Fetch.requestPaused': self._handle_request_paused,
1320 'Runtime.consoleAPICalled': self._handle_console,
1321 'Runtime.exceptionThrown': self._handle_exception,
1322 'Page.frameStoppedLoading': self._handle_frame_stopped_loading,
1323 'Page.screencastFrame': self.screencaster,
1324 }
1325 self._receiver = threading.Thread(
1326 target=self._receive,
1327 name="WebSocket events consumer",
1328 args=(get_db_name(),)
1329 )
1330 self._receiver.start()
1331 self._logger.info('Enable chrome headless console log notification')
1332 self._websocket_send('Runtime.enable')
1333 self._websocket_request('Fetch.enable')
1334 self._logger.info('Chrome headless enable page notifications')
1335 self._websocket_send('Page.enable')
1336 self._websocket_send('Page.setDownloadBehavior', params={
1337 'behavior': 'deny',
1338 'eventsEnabled': False,
1339 })
1340 self._websocket_send('Emulation.setFocusEmulationEnabled', params={'enabled': True})
1341 emulated_device = {
1342 'mobile': False,
1343 'width': None,
1344 'height': None,
1345 'deviceScaleFactor': 1,
1346 }
1347 emulated_device['width'], emulated_device['height'] = [int(size) for size in test_case.browser_size.split(",")]
1348 self._websocket_request('Emulation.setDeviceMetricsOverride', params=emulated_device)
1350 def signal_handler(self, sig, frame):
1351 if sig == signal.SIGXCPU:
1352 _logger.info('CPU time limit reached, stopping Chrome and shutting down')
1353 self.stop()
1354 exit()
1356 def throttle(self, factor: int | None) -> None:
1357 if not factor:
1358 return
1360 assert 1 <= factor <= 50 # arbitrary upper limit
1361 self.throttling_factor = factor
1362 self._websocket_request('Emulation.setCPUThrottlingRate', params={'rate': factor})
1364 def stop(self):
1365 # method may be called during `_open_websocket`
1366 if hasattr(self, 'ws'):
1367 try:
1368 self.screencaster.stop()
1370 self._websocket_request('Page.stopLoading')
1371 self._websocket_request('Runtime.evaluate', params={'expression': """
1372 ('serviceWorker' in navigator) &&
1373 navigator.serviceWorker.getRegistrations().then(
1374 registrations => Promise.all(registrations.map(r => r.unregister()))
1375 )
1376 """, 'awaitPromise': True})
1377 # wait for the screenshot or whatever
1378 wait(self._responses.values(), 10)
1379 self._result.cancel()
1381 self._logger.info("Closing chrome headless with pid %s", self.chrome.pid)
1382 self._websocket_request('Browser.close')
1383 except ChromeBrowserException as e:
1384 _logger.runbot("WS error during browser shutdown: %s", e)
1385 except Exception: # noqa: BLE001
1386 _logger.warning("Error during browser shutdown", exc_info=True)
1387 self._logger.info("Closing websocket connection")
1388 self.ws.close()
1390 self._logger.info("Terminating chrome headless with pid %s", self.chrome.pid)
1391 self.chrome.terminate()
1392 try:
1393 self.chrome.wait(5)
1394 except subprocess.TimeoutExpired:
1395 self._logger.warning("Killing chrome headless with pid %s: still alive", self.chrome.pid)
1396 self.chrome.kill()
1398 self._logger.info('Removing chrome user profile "%s"', self.user_data_dir)
1399 shutil.rmtree(self.user_data_dir, ignore_errors=True)
1401 # Restore previous signal handler
1402 if self.sigxcpu_handler:
1403 signal.signal(signal.SIGXCPU, self.sigxcpu_handler)
1405 @property
1406 def executable(self):
1407 try:
1408 return _find_executable()
1409 except Exception:
1410 self._logger.warning('Chrome executable not found')
1411 raise
1413 def _spawn_chrome(self, cmd):
1414 log_path = pathlib.Path(self.user_data_dir, 'err.log')
1415 with log_path.open('wb') as log_file:
1416 # pylint: disable=subprocess-popen-preexec-fn
1417 proc = subprocess.Popen(cmd, stderr=log_file, preexec_fn=_preexec) # noqa: PLW1509
1419 port_file = pathlib.Path(self.user_data_dir, 'DevToolsActivePort')
1420 for _ in range(CHECK_BROWSER_ITERATIONS):
1421 time.sleep(CHECK_BROWSER_SLEEP)
1422 if port_file.is_file() and port_file.stat().st_size > 5:
1423 with port_file.open('r', encoding='utf-8') as f:
1424 return proc, int(f.readline())
1426 if proc.poll() is None:
1427 proc.terminate()
1428 try:
1429 proc.wait(5)
1430 except subprocess.TimeoutExpired:
1431 proc.kill()
1432 proc.wait()
1433 self._logger.warning('Chrome headless failed to start:\n%s', log_path.read_text(encoding="utf-8"))
1434 # since the chrome never started, it's not going to be `stop`-ed so we
1435 # need to cleanup the directory here
1436 shutil.rmtree(self.user_data_dir, ignore_errors=True)
1438 raise unittest.SkipTest(f'Failed to detect chrome devtools port after {BROWSER_WAIT :.1f}s.')
1440 def _chrome_start(
1441 self,
1442 user_data_dir: str,
1443 touch_enabled: bool,
1444 headless=True,
1445 debug=False,
1446 ):
1447 headless_switches = {
1448 '--headless': '',
1449 '--disable-extensions': '',
1450 '--disable-background-networking' : '',
1451 '--disable-background-timer-throttling' : '',
1452 '--disable-backgrounding-occluded-windows': '',
1453 '--disable-renderer-backgrounding' : '',
1454 '--disable-breakpad': '',
1455 '--disable-client-side-phishing-detection': '',
1456 '--disable-crash-reporter': '',
1457 '--disable-dev-shm-usage': '',
1458 '--disable-namespace-sandbox': '',
1459 '--disable-translate': '',
1460 '--no-sandbox': '',
1461 '--disable-gpu': '',
1462 '--enable-unsafe-swiftshader': '',
1463 '--mute-audio': '',
1464 }
1465 switches = {
1466 # required for tours that use Youtube autoplay conditions (namely website_slides' "course_tour")
1467 '--autoplay-policy': 'no-user-gesture-required',
1468 '--disable-default-apps': '',
1469 '--disable-device-discovery-notifications': '',
1470 '--no-default-browser-check': '',
1471 '--remote-debugging-address': HOST,
1472 '--remote-debugging-port': str(self.remote_debugging_port),
1473 '--user-data-dir': user_data_dir,
1474 '--no-first-run': '',
1475 # FIXME: these next 2 flags are temporarily uncommented to allow client
1476 # code to manually run garbage collection. This is done as currently
1477 # the Chrome unit test process doesn't have access to its available
1478 # memory, so it cannot run the GC efficiently and may run out of memory
1479 # and crash. These should be re-commented when the process is correctly
1480 # configured.
1481 '--enable-precise-memory-info': '',
1482 '--js-flags': '--expose-gc',
1483 }
1484 if headless:
1485 switches.update(headless_switches)
1486 if touch_enabled:
1487 # enable Chrome's Touch mode, useful to detect touch capabilities using
1488 # "'ontouchstart' in window"
1489 switches['--touch-events'] = ''
1490 if debug is not False:
1491 switches['--auto-open-devtools-for-tabs'] = ''
1492 switches['--start-fullscreen'] = ''
1494 cmd = [self.executable]
1495 cmd += ['%s=%s' % (k, v) if v else k for k, v in switches.items()]
1496 url = 'about:blank'
1497 cmd.append(url)
1498 try:
1499 proc, devtools_port = self._spawn_chrome(cmd)
1500 except OSError:
1501 raise unittest.SkipTest("%s not found" % cmd[0])
1502 self._logger.info('Chrome pid: %s', proc.pid)
1503 self._logger.info('Chrome headless temporary user profile dir: %s', self.user_data_dir)
1505 return proc, devtools_port
1507 def _json_command(self, command, timeout=3):
1508 """Queries browser state using JSON
1510 Available commands:
1512 ``''``
1513 return list of tabs with their id
1514 ``list`` (or ``json/``)
1515 list tabs
1516 ``new``
1517 open a new tab
1518 :samp:`activate/{id}`
1519 activate a tab
1520 :samp:`close/{id}`
1521 close a tab
1522 ``version``
1523 get chrome and dev tools version
1524 ``protocol``
1525 get the full protocol
1526 """
1527 url = f'http://{HOST}:{self.devtools_port}/json/{command}'.rstrip('/')
1528 self._logger.info("Issuing json command %s", url)
1529 delay = 0.1
1530 tries = 0
1531 failure_info = None
1532 message = None
1533 while timeout > 0:
1534 if self.chrome.poll() is not None:
1535 message = 'Chrome crashed at startup'
1536 break
1537 try:
1538 r = requests.get(url, timeout=3)
1539 if r.ok:
1540 return r.json()
1541 except requests.ConnectionError as e:
1542 failure_info = str(e)
1543 message = 'Connection Error while trying to connect to Chrome debugger'
1544 except requests.exceptions.ReadTimeout as e:
1545 failure_info = str(e)
1546 message = 'Connection Timeout while trying to connect to Chrome debugger'
1547 break
1549 time.sleep(delay)
1550 timeout -= delay
1551 delay = delay * 1.5
1552 tries += 1
1553 self._logger.error("%s after %s tries" % (message, tries))
1554 if failure_info:
1555 self._logger.info(failure_info)
1556 self.stop()
1557 raise unittest.SkipTest("Error during Chrome headless connection")
1559 def _open_websocket(self):
1560 version = self._json_command('version')
1561 self._logger.info('Browser version: %s', version['Browser'])
1563 start = time.time()
1564 while (time.time() - start) < 5.0:
1565 ws_url = next((
1566 target['webSocketDebuggerUrl']
1567 for target in self._json_command('')
1568 if target['type'] == 'page'
1569 if target['url'] == 'about:blank'
1570 ), None)
1571 if ws_url:
1572 break
1574 time.sleep(0.1)
1575 else:
1576 self.stop()
1577 raise unittest.SkipTest("Error during Chrome connection: never found 'page' target")
1579 self._logger.info('Websocket url found: %s', ws_url)
1580 ws = websocket.create_connection(ws_url, enable_multithread=True, suppress_origin=True)
1581 if ws.getstatus() != 101:
1582 raise unittest.SkipTest("Cannot connect to chrome dev tools")
1583 ws.settimeout(0.01)
1584 return ws
1586 def _receive(self, dbname):
1587 threading.current_thread().dbname = dbname
1588 # So CDT uses a streamed JSON-RPC structure, meaning a request is
1589 # {id, method, params} and eventually a {id, result | error} should
1590 # arrive the other way, however for events it uses "notifications"
1591 # meaning request objects without an ``id``, but *coming from the server
1592 while True: # or maybe until `self._result` is `done()`?
1593 try:
1594 msg = self.ws.recv()
1595 if not msg:
1596 continue
1597 self._logger.debug('\n<- %s', msg)
1598 except websocket.WebSocketTimeoutException:
1599 continue
1600 except websocket.WebSocketConnectionClosedException as e:
1601 if not self._result.done():
1602 del self.ws
1603 self._result.set_exception(e)
1604 for f in self._responses.values():
1605 f.cancel()
1606 return
1607 except Exception as e:
1608 if isinstance(e, ConnectionResetError) and self._result.done():
1609 return
1610 # if the socket is still connected something bad happened,
1611 # otherwise the client was just shut down
1612 if self.ws.connected:
1613 self._result.set_exception(e)
1614 raise
1615 self._result.cancel()
1616 return
1618 res = json.loads(msg)
1619 request_id = res.get('id')
1620 try:
1621 if request_id is None:
1622 if handler := self._handlers.get(res['method']):
1623 handler(**res['params'])
1624 elif f := self._responses.pop(request_id, None):
1625 if 'result' in res:
1626 f.set_result(res['result'])
1627 else:
1628 f.set_exception(ChromeBrowserException(res['error']['message']))
1629 except Exception:
1630 _logger.exception(
1631 "While processing message %s",
1632 shorten(str(msg), 500, placeholder='...'),
1633 )
1635 def _websocket_request(self, method, *, params=None, timeout=10.0):
1636 assert threading.get_ident() != self._receiver.ident,\
1637 "_websocket_request must not be called from the consumer thread"
1638 if not hasattr(self, 'ws'):
1639 return None
1641 f = self._websocket_send(method, params=params, with_future=True)
1642 try:
1643 return f.result(timeout=timeout * self.throttling_factor)
1644 except concurrent.futures.TimeoutError:
1645 raise TimeoutError(f'{method}({params or ""})')
1647 def _websocket_send(self, method, *, params=None, with_future=False):
1648 """send chrome devtools protocol commands through websocket
1650 If ``with_future`` is set, returns a ``Future`` for the operation.
1651 """
1652 if not hasattr(self, 'ws'):
1653 return None
1655 result = None
1656 request_id = next(self._request_id)
1657 if with_future:
1658 result = self._responses[request_id] = Future()
1659 payload = {'method': method, 'id': request_id}
1660 if params:
1661 payload['params'] = params
1662 self._logger.debug('\n-> %s', payload)
1663 self.ws.send(json.dumps(payload))
1664 return result
1666 def _handle_request_paused(self, **params):
1667 url = params['request']['url']
1668 if url.startswith(f'http://{HOST}'):
1669 cmd = 'Fetch.continueRequest'
1670 response = {}
1671 else:
1672 cmd = 'Fetch.fulfillRequest'
1673 response = self.test_case.fetch_proxy(url)
1674 try:
1675 self._websocket_send(cmd, params={'requestId': params['requestId'], **response})
1676 except websocket.WebSocketConnectionClosedException:
1677 pass
1678 except (BrokenPipeError, ConnectionResetError, OSError):
1679 # this can happen if the browser is closed. Just ignore it.
1680 _logger.info("Websocket error while handling request %s", params['request']['url'])
1682 def _handle_console(self, type, args=None, stackTrace=None, **kw): # pylint: disable=redefined-builtin
1683 # console formatting differs somewhat from Python's, if args[0] has
1684 # format modifiers that many of args[1:] get formatted in, missing
1685 # args are replaced by empty strings and extra args are concatenated
1686 # (space-separated)
1687 #
1688 # current version modifies the args in place which could and should
1689 # probably be improved
1690 if args:
1691 arg0, args = str(self._from_remoteobject(args[0])), args[1:]
1692 else:
1693 arg0, args = '', []
1694 formatted = [re.sub(r'%[%sdfoOc]', self.console_formatter(args), arg0)]
1695 # formatter consumes args it uses, leaves unformatted args untouched
1696 formatted.extend(str(self._from_remoteobject(arg)) for arg in args)
1697 message = ' '.join(formatted)
1698 stack = ''.join(self._format_stack({'type': type, 'stackTrace': stackTrace}))
1699 if stack:
1700 message += '\n' + stack
1702 log_type = type
1703 _logger = self._logger.getChild('browser')
1704 if self._result.done() and IGNORED_MSGS(message):
1705 log_type = 'dir'
1706 _logger.log(
1707 self._TO_LEVEL.get(log_type, logging.INFO),
1708 "%s%s",
1709 "Error received after termination: " if self._result.done() else "",
1710 message # might still have %<x> characters
1711 )
1713 if log_type == 'error':
1714 self.had_failure = True
1715 if self._result.done():
1716 return
1717 if not self.error_checker or self.error_checker(message):
1718 self.take_screenshot()
1719 try:
1720 self._result.set_exception(ChromeBrowserException(message))
1721 except CancelledError:
1722 ...
1723 except InvalidStateError:
1724 self._logger.warning(
1725 "Trying to set result to failed (%s) but found the future settled (%s)",
1726 message, self._result
1727 )
1728 elif message == self.success_signal:
1729 @run
1730 def _get_heap():
1731 yield self._websocket_send("HeapProfiler.collectGarbage", with_future=True)
1732 r = yield self._websocket_send("Runtime.getHeapUsage", with_future=True)
1733 _logger.info("heap %d (allocated %d)", r['usedSize'], r['totalSize'])
1735 @run
1736 def _check_form():
1737 node_id = 0
1739 with contextlib.suppress(Exception):
1740 d = yield self._websocket_send('DOM.getDocument', params={'depth': 0}, with_future=True)
1741 form = yield self._websocket_send("DOM.querySelector", params={
1742 'nodeId': d['root']['nodeId'],
1743 'selector': '.o_form_dirty',
1744 }, with_future=True)
1745 node_id = form['nodeId']
1747 if node_id:
1748 self.take_screenshot("unsaved_form_")
1749 msg = """\
1750Tour finished with a dirty form view being open.
1752Dirty form views are automatically saved when the page is closed, \
1753which leads to stray network requests and inconsistencies."""
1754 if self._result.done():
1755 _logger.error("%s", msg)
1756 else:
1757 self._result.set_exception(ChromeBrowserException(msg))
1758 return
1760 if not self._result.done():
1761 self._result.set_result(True)
1762 elif self._result.exception() is None:
1763 _logger.error("Tried to make the tour successful twice.")
1766 def _handle_exception(self, exceptionDetails, timestamp):
1767 message = exceptionDetails['text']
1768 exception = exceptionDetails.get('exception')
1769 if exception:
1770 message += str(self._from_remoteobject(exception))
1771 exceptionDetails['type'] = 'trace' # fake this so _format_stack works
1772 stack = ''.join(self._format_stack(exceptionDetails))
1773 if stack:
1774 message += '\n' + stack
1776 if self._result.done():
1777 if not IGNORED_MSGS(message):
1778 self._logger.getChild('browser').error(
1779 "Exception received after termination: %s", message)
1780 return
1782 self.take_screenshot()
1783 try:
1784 self._result.set_exception(ChromeBrowserException(message))
1785 except CancelledError:
1786 ...
1787 except InvalidStateError:
1788 self._logger.warning(
1789 "Trying to set result to failed (%s) but found the future settled (%s)",
1790 message, self._result
1791 )
1793 def _handle_frame_stopped_loading(self, frameId):
1794 wait = self._frames.pop(frameId, None)
1795 if wait:
1796 wait()
1798 _TO_LEVEL = {
1799 'debug': logging.DEBUG,
1800 'log': logging.INFO,
1801 'info': logging.INFO,
1802 'warning': logging.WARNING,
1803 'error': logging.ERROR,
1804 'dir': logging.RUNBOT,
1805 # TODO: what do with
1806 # dir, dirxml, table, trace, clear, startGroup, startGroupCollapsed,
1807 # endGroup, assert, profile, profileEnd, count, timeEnd
1808 }
1810 def take_screenshot(self, prefix='sc_') -> Future[dict]:
1811 def handler(f):
1812 try:
1813 base_png = f.result(timeout=0)['data']
1814 except Exception as e:
1815 self._logger.runbot("Couldn't capture screenshot: %s", e)
1816 return
1817 if not base_png:
1818 self._logger.runbot("Couldn't capture screenshot: expected image data, got %r", base_png)
1819 return
1820 decoded = binascii.a2b_base64(base_png)
1821 save_test_file(type(self.test_case).__name__, decoded, prefix, logger=self._logger)
1823 self._logger.info('Asking for screenshot')
1824 f = self._websocket_send('Page.captureScreenshot', with_future=True)
1825 if f:
1826 f.add_done_callback(handler)
1827 return f
1829 def set_cookie(self, name, value, path, domain):
1830 params = {'name': name, 'value': value, 'path': path, 'domain': domain}
1831 self._websocket_request('Network.setCookie', params=params)
1833 def delete_cookie(self, name, **kwargs):
1834 params = {k: v for k, v in kwargs.items() if k in ['url', 'domain', 'path']}
1835 params['name'] = name
1836 self._websocket_request('Network.deleteCookies', params=params)
1838 def _wait_ready(self, ready_code=None, timeout=60):
1839 timeout *= self.throttling_factor
1840 ready_code = ready_code or "document.readyState === 'complete'"
1841 self._logger.info('Evaluate ready code "%s"', ready_code)
1842 start_time = time.time()
1843 result = None
1844 while True:
1845 taken = time.time() - start_time
1846 if taken > timeout:
1847 break
1849 result = self._websocket_request('Runtime.evaluate', params={
1850 'expression': "try { %s } catch {}" % ready_code,
1851 'awaitPromise': True,
1852 }, timeout=timeout-taken)['result']
1854 if result == {'type': 'boolean', 'value': True}:
1855 time_to_ready = time.time() - start_time
1856 if taken > 2:
1857 self._logger.info('The ready code tooks too much time : %s', time_to_ready)
1858 return True
1860 self.take_screenshot(prefix='sc_failed_ready_')
1861 self._logger.info('Ready code last try result: %s', result)
1862 return False
1864 def _wait_code_ok(self, code, timeout, error_checker=None):
1865 timeout *= self.throttling_factor
1866 self.error_checker = error_checker
1867 self._logger.info('Evaluate test code "%s"', code)
1868 start = time.time()
1869 res = self._websocket_request('Runtime.evaluate', params={
1870 'expression': code,
1871 'awaitPromise': True,
1872 }, timeout=timeout)['result']
1873 if res.get('subtype') == 'error':
1874 raise ChromeBrowserException("Running code returned an error: %s" % res)
1876 err = ChromeBrowserException("failed")
1877 try:
1878 # if the runcode was a promise which took some time to execute,
1879 # discount that from the timeout
1880 if self._result.result(time.time() - start + timeout) and not self.had_failure:
1881 return
1882 except CancelledError:
1883 # regular-ish shutdown
1884 return
1885 except ChromeBrowserException:
1886 self.screencaster.save()
1887 raise
1888 except Exception as e:
1889 err = e
1891 self.take_screenshot()
1892 self.screencaster.save()
1894 if isinstance(err, concurrent.futures.TimeoutError):
1895 raise ChromeBrowserException('Script timeout exceeded') from err
1896 raise ChromeBrowserException("Unknown error") from err
1898 def navigate_to(self, url, wait_stop=False):
1899 self._logger.info('Navigating to: "%s"', url)
1900 nav_result = self._websocket_request('Page.navigate', params={'url': url}, timeout=20.0)
1901 self._logger.info("Navigation result: %s", nav_result)
1902 if wait_stop:
1903 frame_id = nav_result['frameId']
1904 e = threading.Event()
1905 self._frames[frame_id] = e.set
1906 self._logger.info('Waiting for frame %r to stop loading', frame_id)
1907 e.wait(10)
1909 def _from_remoteobject(self, arg):
1910 """ attempts to make a CDT RemoteObject comprehensible
1911 """
1912 objtype = arg['type']
1913 subtype = arg.get('subtype')
1914 if objtype == 'undefined':
1915 # the undefined remoteobject is literally just {type: undefined}...
1916 return 'undefined'
1917 elif objtype != 'object' or subtype not in (None, 'array'):
1918 # value is the json representation for json object
1919 # otherwise fallback on the description which is "a string
1920 # representation of the object" e.g. the traceback for errors, the
1921 # source for functions, ... finally fallback on the entire arg mess
1922 return arg.get('value', arg.get('description', arg))
1923 elif subtype == 'array':
1924 # apparently value is *not* the JSON representation for arrays
1925 # instead it's just Array(3) which is useless, however the preview
1926 # properties are the same as object which is useful (just ignore the
1927 # name which is the index)
1928 return '[%s]' % ', '.join(
1929 repr(p['value']) if p['type'] == 'string' else str(p['value'])
1930 for p in arg.get('preview', {}).get('properties', [])
1931 if re.match(r'\d+', p['name'])
1932 )
1933 # all that's left is type=object, subtype=None aka custom or
1934 # non-standard objects, print as TypeName(param=val, ...), sadly because
1935 # of the way Odoo widgets are created they all appear as Class(...)
1936 # nb: preview properties are *not* recursive, the value is *all* we get
1937 return '%s(%s)' % (
1938 arg.get('className') or 'object',
1939 ', '.join(
1940 '%s=%s' % (p['name'], repr(p['value']) if p['type'] == 'string' else p['value'])
1941 for p in arg.get('preview', {}).get('properties', [])
1942 if p.get('value') is not None
1943 )
1944 )
1946 LINE_PATTERN = '\tat %(functionName)s (%(url)s:%(lineNumber)d:%(columnNumber)d)\n'
1947 def _format_stack(self, logrecord):
1948 if logrecord['type'] not in ['trace']:
1949 return
1951 trace = logrecord.get('stackTrace')
1952 while trace:
1953 for f in trace['callFrames']:
1954 yield self.LINE_PATTERN % f
1955 trace = trace.get('parent')
1957 def console_formatter(self, args):
1958 """ Formats similarly to the console API:
1960 * if there are no args, don't format (return string as-is)
1961 * %% -> %
1962 * %c -> replace by styling directives (ignore for us)
1963 * other known formatters -> replace by corresponding argument
1964 * leftover known formatters (args exhausted) -> replace by empty string
1965 * unknown formatters -> return as-is
1966 """
1967 if not args:
1968 return lambda m: m[0]
1970 def replacer(m):
1971 fmt = m[0][1]
1972 if fmt == '%':
1973 return '%'
1974 if fmt in 'sdfoOc':
1975 if not args:
1976 return ''
1977 repl = args.pop(0)
1978 if fmt == 'c':
1979 return ''
1980 return str(self._from_remoteobject(repl))
1981 return m[0]
1982 return replacer
1984class NoScreencast:
1985 def start(self):
1986 pass
1988 def stop(self):
1989 pass
1991 def save(self):
1992 pass
1994 def __call__(self, sessionId, data, metadata):
1995 pass
1998class Screencaster:
1999 def __init__(self, browser: ChromeBrowser, directory: str):
2000 self.stopped = False
2001 self.browser: ChromeBrowser = browser
2002 self._logger: logging.Logger = browser._logger
2003 self.directory = pathlib.Path(directory, get_db_name(), 'screencasts')
2004 ts = datetime.now()
2005 self.frames_dir = self.directory / f'frames-{ts:%Y%m%dT%H%M%S.%f}'
2006 self.frames_dir.mkdir(parents=True, exist_ok=True)
2007 self.frames = []
2009 def start(self):
2010 self._logger.info('Starting screencast')
2011 self.browser._websocket_send('Page.startScreencast')
2013 def __call__(self, sessionId, data, metadata):
2014 self.browser._websocket_send('Page.screencastFrameAck', params={'sessionId': sessionId})
2015 if self.stopped:
2016 # if already stopped, drop the frames as we might have removed the directory already
2017 return
2018 outfile = self.frames_dir / f'frame_{len(self.frames):05d}.png'
2019 try:
2020 outfile.write_bytes(binascii.a2b_base64(data.encode()))
2021 except FileNotFoundError:
2022 return
2023 self.frames.append({
2024 'file_path': outfile,
2025 'timestamp': metadata.get('timestamp')
2026 })
2028 def stop(self):
2029 self.browser._websocket_send('Page.stopScreencast')
2030 self.stopped = True
2031 if self.frames_dir.is_dir():
2032 shutil.rmtree(self.frames_dir, ignore_errors=True)
2034 def save(self):
2035 if self.stopped:
2036 return
2037 self.browser._websocket_send('Page.stopScreencast')
2038 # Wait for frames just in case, ideally we'd wait for the Browse.close
2039 # event or something but that doesn't exist.
2040 time.sleep(5)
2041 self.stopped = True
2042 if not self.frames:
2043 self._logger.debug('No screencast frames to encode')
2044 return
2046 frames, self.frames = self.frames, []
2047 t = time.time()
2048 duration = 1/24
2049 concat_script_path = self.frames_dir.with_suffix('.txt')
2050 with concat_script_path.open("w") as concat_file:
2051 for f, next_frame in zip_longest(frames, islice(frames, 1, None)):
2052 frame_file_path = f['file_path']
2054 if f['timestamp'] is not None:
2055 end_time = next_frame['timestamp'] if next_frame else t
2056 duration = end_time - f['timestamp']
2057 concat_file.write(f"file '{frame_file_path}'\nduration {duration}\n")
2058 concat_file.write(f"file '{frame_file_path}'") # needed by the concat plugin
2060 try:
2061 ffmpeg_path = find_in_path('ffmpeg')
2062 except IOError:
2063 self._logger.runbot('Screencast frames in: %s', self.frames_dir)
2064 return
2066 outfile = self.frames_dir.with_suffix('.mp4')
2067 try:
2068 subprocess.run([
2069 ffmpeg_path,
2070 '-y', '-loglevel', 'warning',
2071 '-f', 'concat', '-safe', '0', '-i', concat_script_path,
2072 '-vf', 'pad=ceil(iw/2)*2:ceil(ih/2)*2',
2073 '-c:v', 'libx265', '-x265-params', 'lossless=1',
2074 outfile,
2075 ], preexec_fn=_preexec, check=True)
2076 except subprocess.CalledProcessError:
2077 self._logger.error('Failed to encode screencast, screencast frames in %s', self.frames_dir)
2078 else:
2079 concat_script_path.unlink()
2080 shutil.rmtree(self.frames_dir, ignore_errors=True)
2081 self._logger.runbot('Screencast in: %s', outfile)
2084@lru_cache(1)
2085def _find_executable():
2086 system = platform.system()
2087 if system == 'Linux':
2088 for bin_ in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable']:
2089 try:
2090 return find_in_path(bin_)
2091 except IOError:
2092 continue
2094 elif system == 'Darwin':
2095 bins = [
2096 '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
2097 '/Applications/Chromium.app/Contents/MacOS/Chromium',
2098 ]
2099 for bin_ in bins:
2100 if os.path.exists(bin_):
2101 return bin_
2103 elif system == 'Windows':
2104 bins = [
2105 '%ProgramFiles%\\Google\\Chrome\\Application\\chrome.exe',
2106 '%ProgramFiles(x86)%\\Google\\Chrome\\Application\\chrome.exe',
2107 '%LocalAppData%\\Google\\Chrome\\Application\\chrome.exe',
2108 ]
2109 for bin_ in bins:
2110 bin_ = os.path.expandvars(bin_)
2111 if os.path.exists(bin_):
2112 return bin_
2114 raise unittest.SkipTest("Chrome executable not found")
2116class Opener(requests.Session):
2117 """
2118 Flushes and clears the current transaction when starting a request.
2120 This is likely necessary when we make a request to the server, as the
2121 request is made with a test cursor, which uses a different cache than this
2122 transaction.
2123 """
2124 def __init__(self, http_case: HttpCase):
2125 super().__init__()
2126 self.test_case = http_case
2127 self.cr = http_case.cr
2129 def request(self, *args, **kwargs):
2130 assert self.test_case.opener == self
2131 self.cr.flush()
2132 self.cr.clear()
2133 with self.test_case.allow_requests():
2134 return super().request(*args, **kwargs)
2137class Transport(xmlrpclib.Transport):
2138 """ see :class:`Opener` """
2139 def __init__(self, http_case: HttpCase):
2140 self.test_case = http_case
2141 self.cr = http_case.cr
2142 super().__init__()
2144 def request(self, *args, **kwargs):
2145 self.cr.flush()
2146 self.cr.clear()
2147 with self.test_case.allow_requests(all_requests=True):
2148 return super().request(*args, **kwargs)
2151class JsonRpcException(Exception):
2152 def __init__(self, code, message):
2153 super().__init__(message)
2154 self.code = code
2157class HttpCase(TransactionCase):
2158 """ Transactional HTTP TestCase with url_open and Chrome headless helpers. """
2159 registry_test_mode = True
2160 browser = None
2161 browser_size = '1366x768'
2162 touch_enabled = False
2163 session: odoo.http.Session = None
2165 _logger: logging.Logger = None
2167 @classmethod
2168 def setUpClass(cls):
2169 super().setUpClass()
2170 if cls.registry_test_mode:
2171 cls.registry_enter_test_mode_cls()
2173 ICP = cls.env['ir.config_parameter']
2174 ICP.set_param('web.base.url', cls.base_url())
2175 ICP.env.flush_all()
2176 # v8 api with correct xmlrpc exception handling.
2177 cls.xmlrpc_url = f'{cls.base_url()}/xmlrpc/2/'
2178 cls._logger = logging.getLogger('%s.%s' % (cls.__module__, cls.__name__))
2180 @classmethod
2181 def base_url(cls):
2182 return f"http://{HOST}:{cls.http_port():d}"
2184 @classmethod
2185 def http_port(cls):
2186 if odoo.service.server.server is None:
2187 return None
2188 return odoo.service.server.server.httpd.server_port
2190 def setUp(self):
2191 super().setUp()
2193 self._logger = self._logger.getChild(self._testMethodName)
2195 self.xmlrpc_common = xmlrpclib.ServerProxy(self.xmlrpc_url + 'common', transport=Transport(self))
2196 self.xmlrpc_db = xmlrpclib.ServerProxy(self.xmlrpc_url + 'db', transport=Transport(self))
2197 self.xmlrpc_object = xmlrpclib.ServerProxy(self.xmlrpc_url + 'object', transport=Transport(self), use_datetime=True)
2198 # setup an url opener helper
2199 self.opener = Opener(self)
2200 self.http_key_sequence = itertools.count()
2201 # we need to allow requests during pdf rendering.
2202 old_run_wkhtmltopdf = ir_actions_report._run_wkhtmltopdf
2204 def _patched_run_wkhtmltopdf(args):
2205 with patch.object(self, 'http_request_key', 'wkhtmltopdf'), release_test_lock():
2206 args = ['--cookie', TEST_CURSOR_COOKIE_NAME, 'wkhtmltopdf', *args]
2207 return old_run_wkhtmltopdf(args)
2209 self.startPatcher(
2210 patch.object(ir_actions_report, '_run_wkhtmltopdf', _patched_run_wkhtmltopdf),
2211 )
2213 @contextmanager
2214 def enter_registry_test_mode(self):
2215 _logger.warning("HTTPCase is already in test mode")
2216 yield
2218 @contextmanager
2219 def allow_pdf_render(self):
2220 _logger.warning("HTTPCase does not require calling allow_pdf_render")
2221 yield
2223 @contextmanager
2224 def allow_requests(self, browser: ChromeBrowser | None = None, all_requests=False):
2225 """
2226 Allows HTTP requests for the scope of the context.
2228 Params:
2229 browser (ChromeBrowser | None): if given, add the cookie to the browser.
2230 all_requests (bool): if True, allows all requests regardless of cookie.
2231 """
2232 with ExitStack() as defer:
2233 defer.enter_context(release_test_lock())
2234 if all_requests:
2235 self.http_request_allow_all = True
2236 new_key = f'{self.canonical_tag}__{next(self.http_key_sequence)}'
2237 defer.enter_context(patch.object(self, 'http_request_key', new_key))
2238 old_cookie = self.opener.cookies.get(TEST_CURSOR_COOKIE_NAME)
2239 if old_cookie:
2240 defer.callback(self.opener.cookies.set, TEST_CURSOR_COOKIE_NAME, old_cookie)
2241 else:
2242 defer.callback(self.opener.cookies.pop, TEST_CURSOR_COOKIE_NAME, None)
2243 self.opener.cookies[TEST_CURSOR_COOKIE_NAME] = new_key
2244 if browser:
2245 browser.set_cookie(
2246 TEST_CURSOR_COOKIE_NAME, self.http_request_key, '/', HOST,
2247 )
2248 yield
2250 def parse_http_location(self, location):
2251 """ Parse a Location http header typically found in 201/3xx
2252 responses, return the corresponding parsed url object. The scheme/host
2253 are taken from ``base_url()`` in case they are missing from the
2254 header.
2255 """
2256 if not location:
2257 return urlsplit('')
2258 s = urlsplit(urljoin(self.base_url(), location))
2259 # normalise query parameters
2260 return s._replace(query=urlencode(parse_qsl(s.query)))
2262 def assertURLEqual(self, test_url, truth_url, message=None):
2263 """ Assert that two URLs are equivalent. If any URL is missing
2264 a scheme and/or host, assume the same scheme/host as base_url()
2265 """
2266 self.assertEqual(
2267 self.parse_http_location(test_url),
2268 self.parse_http_location(truth_url),
2269 message,
2270 )
2272 def build_rpc_payload(self, params=None):
2273 """
2274 Helper to properly build jsonrpc payload
2275 """
2276 return {
2277 "jsonrpc": "2.0",
2278 "method": "call",
2279 "id": str(uuid4()),
2280 "params": params or {},
2281 }
2283 def url_open(self, url, data=None, files=None, timeout=12, headers=None, json=None, params=None, allow_redirects=True, cookies=None, method: str | None = None):
2284 if not method and (data or files or json):
2285 method = 'POST'
2286 method = method or 'GET'
2287 if url.startswith('/'):
2288 url = self.base_url() + url
2289 return self.opener.request(method, url, params=params, data=data, json=json, files=files, timeout=timeout, headers=headers, cookies=cookies, allow_redirects=allow_redirects)
2291 def _wait_remaining_requests(self, timeout=10):
2293 def get_http_request_threads():
2294 return [t for t in threading.enumerate() if t.name.startswith('odoo.service.http.request.')]
2296 start_time = time.time()
2297 request_threads = get_http_request_threads()
2298 if not request_threads:
2299 return
2301 self._logger.info('waiting for threads: %s', request_threads)
2303 for thread in request_threads:
2304 thread.join(timeout - (time.time() - start_time))
2306 request_threads = get_http_request_threads()
2307 for thread in request_threads:
2308 self._logger.info("Stop waiting for thread %s handling request for url %s",
2309 thread.name, getattr(thread, 'url', '<UNKNOWN>'))
2311 if request_threads:
2312 self._logger.info('remaining requests')
2313 odoo.tools.misc.dumpstacks()
2315 def logout(self, keep_db=True):
2316 self.session.logout(keep_db=keep_db)
2317 odoo.http.root.session_store.save(self.session)
2319 def authenticate(self, user, password, *,
2320 browser: ChromeBrowser = None, session_extra: dict | None = None):
2321 if getattr(self, 'session', None):
2322 odoo.http.root.session_store.delete(self.session)
2324 self.session = session = odoo.http.root.session_store.new()
2325 session.update(
2326 odoo.http.get_default_session(),
2327 db=get_db_name(),
2328 # In order to avoid perform a query to each first `url_open`
2329 # in a test (insert `res.device.log`).
2330 _trace_disable=True,
2331 )
2332 session.context['lang'] = odoo.http.DEFAULT_LANG
2334 if session_extra:
2335 if extra_ctx := session_extra.pop('context', None):
2336 session.context.update(extra_ctx)
2337 session.update(session_extra)
2339 if user: # if authenticated
2340 # Flush and clear the current transaction. This is useful, because
2341 # the call below opens a test cursor, which uses a different cache
2342 # than this transaction.
2343 self.cr.flush()
2344 self.cr.clear()
2346 def patched_check_credentials(self, credential, env):
2347 return {'uid': self.id, 'auth_method': 'password', 'mfa': 'default'}
2349 # patching to speedup the check in case the password is hashed with many hashround + avoid to update the password
2350 with patch('odoo.addons.base.models.res_users.ResUsersPatchedInTest._check_credentials', new=patched_check_credentials):
2351 credential = {'login': user, 'password': password, 'type': 'password'}
2352 auth_info = self.env['res.users'].authenticate(credential, {'interactive': False})
2353 uid = auth_info['uid']
2354 env = api.Environment(self.cr, uid, {})
2355 session.uid = uid
2356 session.login = user
2357 session.session_token = uid and security.compute_session_token(session, env)
2358 session.context = dict(env['res.users'].context_get())
2360 odoo.http.root.session_store.save(session)
2361 # Reset the opener: turns out when we set cookies['foo'] we're really
2362 # setting a cookie on domain='' path='/'.
2363 #
2364 # But then our friendly neighborhood server might set a cookie for
2365 # domain='localhost' path='/' (with the same value) which is considered
2366 # a *different* cookie following ours rather than the same.
2367 #
2368 # When we update our cookie, it's done in-place, so the server-set
2369 # cookie is still present and (as it follows ours and is more precise)
2370 # very likely to still be used, therefore our session change is ignored.
2371 #
2372 # An alternative would be to set the cookie to None (unsetting it
2373 # completely) or clear-ing session.cookies.
2374 self.opener = Opener(self)
2375 self.opener.cookies.set("session_id", session.sid, domain=HOST)
2376 if browser:
2377 self._logger.info('Setting session cookie in browser')
2378 browser.set_cookie('session_id', session.sid, '/', HOST)
2380 return session
2382 def fetch_proxy(self, url):
2383 """
2384 This method is called every time a request is made from the chrome browser outside the local network
2385 Returns a response that will be sent to the browser to simulate the external request.
2386 """
2388 if 'https://fonts.googleapis.com/css' in url:
2389 _logger.info('External chrome request during tests: Return empty file for %s', url)
2390 return self.make_fetch_proxy_response('') # return empty css file, we don't care
2392 _logger.info('External chrome request during tests: returning 404 for %s', url)
2393 return {
2394 'body': '',
2395 'responseCode': 404,
2396 'responseHeaders': [],
2397 }
2399 def make_fetch_proxy_response(self, content, code=200):
2400 if isinstance(content, str):
2401 content = content.encode()
2402 return {
2403 'body': base64.b64encode(content).decode(),
2404 'responseCode': code,
2405 'responseHeaders': [
2406 {'name': 'access-control-allow-origin', 'value': '*'},
2407 {'name': 'cache-control', 'value': 'public, max-age=10000'},
2408 ],
2409 }
2411 def browser_js(self, url_path, code, ready='', login=None, timeout=60, cookies=None, error_checker=None, watch=False, success_signal=DEFAULT_SUCCESS_SIGNAL, debug=False, cpu_throttling=None, **kw):
2412 """ Test JavaScript code running in the browser.
2414 To signal success test do: `console.log()` with the expected `success_signal`. Default is "test successful"
2415 To signal test failure raise an exception or call `console.error` with a message.
2416 Test will stop when a failure occurs if `error_checker` is not defined or returns `True` for this message
2418 :param string url_path: URL path to load the browser page on
2419 :param string code: JavaScript code to be executed
2420 :param string ready: JavaScript object to wait for before proceeding with the test
2421 :param string login: logged in user which will execute the test. e.g. 'admin', 'demo'
2422 :param int timeout: maximum time to wait for the test to complete (in seconds). Default is 60 seconds
2423 :param dict cookies: dictionary of cookies to set before loading the page
2424 :param error_checker: function to filter failures out.
2425 If provided, the function is called with the error log message, and if it returns `False` the log is ignored and the test continue
2426 If not provided, every error log triggers a failure
2427 :param bool watch: open a new browser window to watch the test execution
2428 :param string success_signal: string signal to wait for to consider the test successful
2429 :param bool debug: automatically open a fullscreen Chrome window with opened devtools and a debugger breakpoint set at the start of the tour.
2430 The tour is ran with the `debug=assets` query parameter. When an error is thrown, the debugger stops on the exception.
2431 :param int cpu_throttling: CPU throttling rate as a slowdown factor (1 is no throttle, 2 is 2x slowdown, etc)
2432 """
2433 if not self.env.registry.loaded:
2434 self._logger.warning('HttpCase test should be in post_install only')
2436 # increase timeout if coverage is running
2437 if any(f.filename.endswith('/coverage/execfile.py') for f in inspect.stack() if f.filename):
2438 timeout = timeout * 1.5
2440 if debug is not False:
2441 watch = True
2442 timeout = 1e6
2443 if watch:
2444 self._logger.warning('watch mode is only suitable for local testing')
2446 browser = ChromeBrowser(self, headless=not watch, success_signal=success_signal, debug=debug)
2447 with self.allow_requests(browser=browser), contextlib.ExitStack() as atexit:
2448 atexit.callback(self._wait_remaining_requests)
2449 if "bus.bus" in self.env.registry:
2450 from odoo.addons.bus.websocket import CloseCode, _kick_all, WebsocketConnectionHandler # noqa: PLC0415
2451 from odoo.addons.bus.models.bus import BusBus # noqa: PLC0415
2453 atexit.callback(_kick_all, CloseCode.KILL_NOW)
2454 original_send_one = BusBus._sendone
2456 def sendone_wrapper(self, target, notification_type, message):
2457 original_send_one(self, target, notification_type, message)
2458 self.env.cr.precommit.run() # Trigger the creation of bus.bus records
2459 self.env.cr.postcommit.run() # Trigger notification dispatching
2461 atexit.enter_context(patch.object(BusBus, "_sendone", sendone_wrapper))
2462 atexit.enter_context(patch.object(
2463 WebsocketConnectionHandler, "websocket_allowed", return_value=True
2464 ))
2466 self.authenticate(login, login, browser=browser)
2467 # Flush and clear the current transaction. This is useful in case
2468 # we make requests to the server, as these requests are made with
2469 # test cursors, which uses different caches than this transaction.
2470 self.cr.flush()
2471 self.cr.clear()
2472 url = urljoin(self.base_url(), url_path)
2473 if watch:
2474 parsed = urlsplit(url)
2475 qs = dict(parse_qsl(parsed.query))
2476 qs['watch'] = '1'
2477 if debug is not False:
2478 qs['debug'] = "assets"
2479 url = urlunsplit(parsed._replace(query=urlencode(qs)))
2480 self._logger.info('Open "%s" in browser', url)
2482 browser.screencaster.start()
2483 if cookies:
2484 for name, value in cookies.items():
2485 browser.set_cookie(name, value, '/', HOST)
2487 cpu_throttling_os = os.environ.get('ODOO_BROWSER_CPU_THROTTLING') # used by dedicated runbot builds
2488 cpu_throttling = int(cpu_throttling_os) if cpu_throttling_os else cpu_throttling
2490 if cpu_throttling:
2491 _logger.log(
2492 logging.INFO if cpu_throttling_os else logging.WARNING,
2493 'CPU throttling mode is only suitable for local testing - '
2494 'Throttling browser CPU to %sx slowdown and extending timeout to %s sec', cpu_throttling, timeout)
2495 browser.throttle(cpu_throttling)
2497 browser.navigate_to(url, wait_stop=not bool(ready))
2498 atexit.callback(browser.stop)
2500 # Needed because tests like test01.js (qunit tests) are passing a ready
2501 # code = ""
2502 self.assertTrue(browser._wait_ready(ready), 'The ready "%s" code was always falsy' % ready)
2504 error = False
2505 try:
2506 browser._wait_code_ok(code, timeout, error_checker=error_checker)
2507 except ChromeBrowserException as chrome_browser_exception:
2508 error = chrome_browser_exception
2509 if error: # dont keep initial traceback, keep that outside of except
2510 if code:
2511 message = 'The test code "%s" failed' % code
2512 else:
2513 message = "Some js test failed"
2514 self.fail('%s\n\n%s' % (message, error))
2516 def start_tour(self, url_path, tour_name, step_delay=None, **kwargs):
2517 """Wrapper for `browser_js` to start the given `tour_name` with the
2518 optional delay between steps `step_delay`. Other arguments from
2519 `browser_js` can be passed as keyword arguments."""
2520 options = {
2521 'stepDelay': step_delay or 0,
2522 'keepWatchBrowser': kwargs.get('watch', False),
2523 'debug': kwargs.get('debug', False),
2524 'startUrl': url_path,
2525 'delayToCheckUndeterminisms': kwargs.pop('delay_to_check_undeterminisms', int(os.getenv("ODOO_TOUR_DELAY_TO_CHECK_UNDETERMINISMS", "0")) or 0),
2526 }
2527 code = kwargs.pop('code', f"odoo.startTour({tour_name!r}, {json.dumps(options)})")
2528 ready = kwargs.pop('ready', f"odoo.isTourReady({tour_name!r})")
2529 timeout = kwargs.pop('timeout', 60)
2531 if step_delay is not None:
2532 self._logger.warning('step_delay is only suitable for local testing')
2533 if options["delayToCheckUndeterminisms"] > 0:
2534 timeout = timeout + 1000 * options["delayToCheckUndeterminisms"]
2535 _logger.runbot("Tour %s is launched with mode: check for undeterminisms.", tour_name)
2536 Users = self.registry['res.users']
2538 def setup(_):
2539 Users.tour_enabled = False
2541 with patch.object(Users, 'tour_enabled', False),\
2542 patch.object(Users, '_post_model_setup__', setup),\
2543 patch.object(Users, '_compute_tour_enabled', lambda _: None):
2544 self.browser_js(url_path=url_path, code=code, ready=ready, timeout=timeout, success_signal="tour succeeded", **kwargs)
2546 def profile(self, **kwargs):
2547 """
2548 for http_case, also patch _get_profiler_context_manager in order to profile all requests
2549 """
2550 sup = super()
2551 _profiler = sup.profile(**kwargs)
2552 def route_profiler(request):
2553 _route_profiler = sup.profile(description=request.httprequest.full_path, db=_profiler.db)
2554 _profiler.sub_profilers.append(_route_profiler)
2555 return _route_profiler
2556 return profiler.Nested(_profiler, patch('odoo.http.Request._get_profiler_context_manager', route_profiler))
2558 def get_method_additional_tags(self, test_method):
2559 """
2560 guess if the test_methods is a tour and adds an `is_tour` tag on the test
2561 """
2562 additional_tags = super().get_method_additional_tags(test_method)
2563 if odoo.tools.config['test_tags'] and 'is_tour' in odoo.tools.config['test_tags']: 2563 ↛ 2564line 2563 didn't jump to line 2564 because the condition on line 2563 was never true
2564 method_source = inspect.getsource(test_method)
2565 if 'self.start_tour' in method_source:
2566 additional_tags.append('is_tour')
2567 return additional_tags
2569 def make_jsonrpc_request(self, route, params=None, headers=None, cookies=None, timeout=12):
2570 """Make a JSON-RPC request to the server.
2572 :raises requests.HTTPError: if one occurred
2573 :raises JsonRpcException: if the response contains an error
2574 """
2575 response = self.opener.post(urljoin(self.base_url(), route), json={
2576 'id': 0,
2577 'jsonrpc': '2.0',
2578 'method': 'call',
2579 'params': params or {},
2580 }, headers=headers, cookies=cookies, timeout=timeout)
2581 response.raise_for_status()
2582 decoded_response = response.json()
2583 if 'error' in decoded_response:
2584 raise JsonRpcException(
2585 code=decoded_response['error']['code'],
2586 message=decoded_response['error']['data']['name']
2587 )
2588 # workaround: JsonRPCDispatcher is broken and may send neither result nor error
2589 return decoded_response.get('result')
2592def no_retry(arg):
2593 """Disable auto retry on decorated test method or test class"""
2594 arg._retry = False
2595 return arg
2598def users(*logins):
2599 """ Decorate a method to execute it once for each given user. """
2600 assert logins, "Expecting at least one login to execute"
2602 def users_decorator(func, /):
2603 @wraps(func)
2604 def with_users(self, *args, **kwargs):
2605 old_uid = self.uid
2606 try:
2607 # retrieve users
2608 Users = self.env['res.users'].with_context(active_test=False)
2609 user_id = {
2610 user.login: user.id
2611 for user in Users.search([('login', 'in', list(logins))])
2612 }
2613 for login in logins:
2614 with self.subTest(login=login):
2615 # switch user and execute func
2616 self.uid = user_id[login]
2617 func(self, *args, **kwargs)
2618 self.env.flush_all()
2619 # Invalidate the cache between subtests, in order to not reuse
2620 # the former user's cache (`test_read_mail`, `test_write_mail`)
2621 self.env.invalidate_all()
2622 finally:
2623 self.uid = old_uid
2625 return with_users
2626 return users_decorator
2629def warmup(func, /):
2630 """
2631 Stabilize assertQueries and assertQueryCount assertions.
2633 Reset the cache to a stable state by flushing pending changes and
2634 invalidating the cache.
2636 Warmup the ormcaches by running the decorated function an extra time
2637 before the actual test runs. The extra execution ignores
2638 assertQueries and assertQueryCount assertions, it also discardes all
2639 changes but the ormcaches ones.
2640 """
2641 @wraps(func)
2642 def warmup(self, *args, **kwargs):
2643 self.env.flush_all()
2644 self.env.invalidate_all()
2645 # run once to warm up the caches
2646 self.warm = False
2647 with contextlib.closing(self.cr.savepoint(flush=False)):
2648 func(self, *args, **kwargs)
2649 self.env.flush_all()
2650 # run once for real
2651 self.env.invalidate_all()
2652 self.warm = True
2653 func(self, *args, **kwargs)
2654 return warmup
2657def can_import(module):
2658 """ Checks if <module> can be imported, returns ``True`` if it can be,
2659 ``False`` otherwise.
2661 To use with ``unittest.skipUnless`` for tests conditional on *optional*
2662 dependencies, which may or may be present but must still be tested if
2663 possible.
2664 """
2665 try:
2666 importlib.import_module(module)
2667 except ImportError:
2668 return False
2669 else:
2670 return True
2673def tagged(*tags):
2674 """A decorator to tag BaseCase objects.
2676 Tags are stored in a set that can be accessed from a 'test_tags' attribute.
2678 A tag prefixed by '-' will remove the tag e.g. to remove the 'standard' tag.
2680 By default, all Test classes from odoo.tests.common have a test_tags
2681 attribute that defaults to 'standard' and 'at_install'.
2683 When using class inheritance, the tags ARE inherited.
2684 """
2685 include = {t for t in tags if not t.startswith('-')}
2686 exclude = {t[1:] for t in tags if t.startswith('-')}
2688 def tags_decorator(obj):
2689 obj.test_tags = (getattr(obj, 'test_tags', set()) | include) - exclude
2690 at_install = 'at_install' in obj.test_tags
2691 post_install = 'post_install' in obj.test_tags
2692 if not (at_install ^ post_install): 2692 ↛ 2693line 2692 didn't jump to line 2693 because the condition on line 2692 was never true
2693 _logger.warning('A tests should be either at_install or post_install, which is not the case of %r', obj)
2694 return obj
2695 return tags_decorator
2698class freeze_time:
2699 """ Object to replace the freezegun in Odoo test suites
2700 It properly handles the test classes decoration
2701 Also, it can be used like the usual method decorator or context manager
2702 """
2703 _freeze_time = staticmethod(freezegun.freeze_time)
2705 def __init__(self, time_to_freeze=None, tz_offset=0, tick=False, as_kwarg='', auto_tick_seconds=0):
2706 self.freezer = self._freeze_time(
2707 time_to_freeze=time_to_freeze,
2708 tz_offset=tz_offset,
2709 tick=tick,
2710 as_kwarg=as_kwarg,
2711 auto_tick_seconds=auto_tick_seconds,
2712 )
2714 def __call__(self, arg):
2715 if isinstance(arg, type) and issubclass(arg, case.TestCase):
2716 arg.freeze_time = self
2717 return arg
2719 return self.freezer(arg)
2721 def __enter__(self):
2722 return self.freezer.start()
2724 def __exit__(self, *args):
2725 self.freezer.stop()
2727 start = __enter__
2728 stop = __exit__
2731freezegun.freeze_time = freeze_time