Coverage for adhoc-cicd-odoo-odoo / odoo / tests / test_cursor.py: 19%
82 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1from __future__ import annotations
3from datetime import datetime
4import threading
6from odoo.sql_db import BaseCursor, Cursor, Savepoint, _logger
7import odoo
10class TestCursor(BaseCursor):
11 """ A pseudo-cursor to be used for tests, on top of a real cursor. It keeps
12 the transaction open across requests, and simulates committing, rolling
13 back, and closing:
15 +------------------------+---------------------------------------------------+
16 | test cursor | queries on actual cursor |
17 +========================+===================================================+
18 |``cr = TestCursor(...)``| |
19 +------------------------+---------------------------------------------------+
20 | ``cr.execute(query)`` | SAVEPOINT test_cursor_N (if not savepoint) |
21 | | query |
22 +------------------------+---------------------------------------------------+
23 | ``cr.commit()`` | RELEASE SAVEPOINT test_cursor_N (if savepoint) |
24 +------------------------+---------------------------------------------------+
25 | ``cr.rollback()`` | ROLLBACK TO SAVEPOINT test_cursor_N (if savepoint)|
26 +------------------------+---------------------------------------------------+
27 | ``cr.close()`` | ROLLBACK TO SAVEPOINT test_cursor_N (if savepoint)|
28 | | RELEASE SAVEPOINT test_cursor_N (if savepoint) |
29 +------------------------+---------------------------------------------------+
30 """
31 _cursors_stack: list[TestCursor] = []
33 def __init__(self, cursor: Cursor, lock: threading.RLock, readonly: bool):
34 assert isinstance(cursor, BaseCursor)
35 super().__init__()
36 self._now: datetime | None = None
37 self._closed: bool = False
38 self._cursor = cursor
39 self.readonly = readonly
40 # we use a lock to serialize concurrent requests
41 self._lock = lock
42 current_test = odoo.modules.module.current_test
43 assert current_test, 'Test Cursor without active test ?'
44 current_test.assertCanOpenTestCursor()
45 lock_timeout = current_test.test_cursor_lock_timeout
46 if not self._lock.acquire(timeout=lock_timeout):
47 raise Exception(f'Unable to acquire lock for test cursor after {lock_timeout}s')
48 try:
49 # Check after acquiring in case current_test has changed.
50 # This can happen if the request was hanging between two tests.
51 current_test.assertCanOpenTestCursor()
52 self._check_cursor_readonly()
53 except Exception:
54 self._lock.release()
55 raise
56 self._cursors_stack.append(self)
57 # in order to simulate commit and rollback, the cursor maintains a
58 # savepoint at its last commit, the savepoint is created lazily
59 self._savepoint: Savepoint | None = None
61 def _check_cursor_readonly(self):
62 last_cursor = self._cursors_stack and self._cursors_stack[-1]
63 if last_cursor and last_cursor.readonly and not self.readonly and last_cursor._savepoint:
64 raise Exception('Opening a read/write test cursor from a readonly one')
66 def _check_savepoint(self) -> None:
67 if not self._savepoint:
68 # we use self._cursor._obj for the savepoint to avoid having the
69 # savepoint queries in the query counts, profiler, ...
70 # Those queries are tests artefacts and should be invisible.
71 self._savepoint = Savepoint(self._cursor._obj)
72 if self.readonly:
73 # this will simulate a readonly connection
74 self._cursor._obj.execute('SET TRANSACTION READ ONLY') # use _obj to avoid impacting query count and profiler.
76 def execute(self, *args, **kwargs) -> None:
77 assert not self._closed, "Cannot use a closed cursor"
78 self._check_savepoint()
79 return self._cursor.execute(*args, **kwargs)
81 def close(self) -> None:
82 if not self._closed:
83 try:
84 self.rollback()
85 if self._savepoint:
86 self._savepoint.close(rollback=False)
87 finally:
88 self._closed = True
90 tos = self._cursors_stack.pop()
91 if tos is not self:
92 _logger.warning("Found different un-closed cursor when trying to close %s: %s", self, tos)
93 self._lock.release()
95 def commit(self) -> None:
96 """ Perform an SQL `COMMIT` """
97 self.flush()
98 if self._savepoint:
99 self._savepoint.close(rollback=self.readonly)
100 self._savepoint = None
101 self.clear()
102 self.prerollback.clear()
103 self.postrollback.clear()
104 self.postcommit.clear() # TestCursor ignores post-commit hooks by default
106 def rollback(self) -> None:
107 """ Perform an SQL `ROLLBACK` """
108 self.clear()
109 self.postcommit.clear()
110 self.prerollback.run()
111 if self._savepoint:
112 self._savepoint.close(rollback=True)
113 self._savepoint = None
114 self.postrollback.run()
116 def __getattr__(self, name):
117 return getattr(self._cursor, name)
119 def dictfetchone(self):
120 """ Return the first row as a dict (column_name -> value) or None if no rows are available. """
121 return self._cursor.dictfetchone()
123 def dictfetchmany(self, size):
124 return self._cursor.dictfetchmany(size)
126 def dictfetchall(self):
127 return self._cursor.dictfetchall()
129 def now(self) -> datetime:
130 """ Return the transaction's timestamp ``datetime.now()``. """
131 if self._now is None:
132 self._now = datetime.now()
133 return self._now