Coverage for adhoc-cicd-odoo-odoo / odoo / tests / test_cursor.py: 19%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:15 +0000

1from __future__ import annotations 

2 

3from datetime import datetime 

4import threading 

5 

6from odoo.sql_db import BaseCursor, Cursor, Savepoint, _logger 

7import odoo 

8 

9 

10class TestCursor(BaseCursor): 

11 """ A pseudo-cursor to be used for tests, on top of a real cursor. It keeps 

12 the transaction open across requests, and simulates committing, rolling 

13 back, and closing: 

14 

15 +------------------------+---------------------------------------------------+ 

16 | test cursor | queries on actual cursor | 

17 +========================+===================================================+ 

18 |``cr = TestCursor(...)``| | 

19 +------------------------+---------------------------------------------------+ 

20 | ``cr.execute(query)`` | SAVEPOINT test_cursor_N (if not savepoint) | 

21 | | query | 

22 +------------------------+---------------------------------------------------+ 

23 | ``cr.commit()`` | RELEASE SAVEPOINT test_cursor_N (if savepoint) | 

24 +------------------------+---------------------------------------------------+ 

25 | ``cr.rollback()`` | ROLLBACK TO SAVEPOINT test_cursor_N (if savepoint)| 

26 +------------------------+---------------------------------------------------+ 

27 | ``cr.close()`` | ROLLBACK TO SAVEPOINT test_cursor_N (if savepoint)| 

28 | | RELEASE SAVEPOINT test_cursor_N (if savepoint) | 

29 +------------------------+---------------------------------------------------+ 

30 """ 

31 _cursors_stack: list[TestCursor] = [] 

32 

33 def __init__(self, cursor: Cursor, lock: threading.RLock, readonly: bool): 

34 assert isinstance(cursor, BaseCursor) 

35 super().__init__() 

36 self._now: datetime | None = None 

37 self._closed: bool = False 

38 self._cursor = cursor 

39 self.readonly = readonly 

40 # we use a lock to serialize concurrent requests 

41 self._lock = lock 

42 current_test = odoo.modules.module.current_test 

43 assert current_test, 'Test Cursor without active test ?' 

44 current_test.assertCanOpenTestCursor() 

45 lock_timeout = current_test.test_cursor_lock_timeout 

46 if not self._lock.acquire(timeout=lock_timeout): 

47 raise Exception(f'Unable to acquire lock for test cursor after {lock_timeout}s') 

48 try: 

49 # Check after acquiring in case current_test has changed. 

50 # This can happen if the request was hanging between two tests. 

51 current_test.assertCanOpenTestCursor() 

52 self._check_cursor_readonly() 

53 except Exception: 

54 self._lock.release() 

55 raise 

56 self._cursors_stack.append(self) 

57 # in order to simulate commit and rollback, the cursor maintains a 

58 # savepoint at its last commit, the savepoint is created lazily 

59 self._savepoint: Savepoint | None = None 

60 

61 def _check_cursor_readonly(self): 

62 last_cursor = self._cursors_stack and self._cursors_stack[-1] 

63 if last_cursor and last_cursor.readonly and not self.readonly and last_cursor._savepoint: 

64 raise Exception('Opening a read/write test cursor from a readonly one') 

65 

66 def _check_savepoint(self) -> None: 

67 if not self._savepoint: 

68 # we use self._cursor._obj for the savepoint to avoid having the 

69 # savepoint queries in the query counts, profiler, ... 

70 # Those queries are tests artefacts and should be invisible. 

71 self._savepoint = Savepoint(self._cursor._obj) 

72 if self.readonly: 

73 # this will simulate a readonly connection 

74 self._cursor._obj.execute('SET TRANSACTION READ ONLY') # use _obj to avoid impacting query count and profiler. 

75 

76 def execute(self, *args, **kwargs) -> None: 

77 assert not self._closed, "Cannot use a closed cursor" 

78 self._check_savepoint() 

79 return self._cursor.execute(*args, **kwargs) 

80 

81 def close(self) -> None: 

82 if not self._closed: 

83 try: 

84 self.rollback() 

85 if self._savepoint: 

86 self._savepoint.close(rollback=False) 

87 finally: 

88 self._closed = True 

89 

90 tos = self._cursors_stack.pop() 

91 if tos is not self: 

92 _logger.warning("Found different un-closed cursor when trying to close %s: %s", self, tos) 

93 self._lock.release() 

94 

95 def commit(self) -> None: 

96 """ Perform an SQL `COMMIT` """ 

97 self.flush() 

98 if self._savepoint: 

99 self._savepoint.close(rollback=self.readonly) 

100 self._savepoint = None 

101 self.clear() 

102 self.prerollback.clear() 

103 self.postrollback.clear() 

104 self.postcommit.clear() # TestCursor ignores post-commit hooks by default 

105 

106 def rollback(self) -> None: 

107 """ Perform an SQL `ROLLBACK` """ 

108 self.clear() 

109 self.postcommit.clear() 

110 self.prerollback.run() 

111 if self._savepoint: 

112 self._savepoint.close(rollback=True) 

113 self._savepoint = None 

114 self.postrollback.run() 

115 

116 def __getattr__(self, name): 

117 return getattr(self._cursor, name) 

118 

119 def dictfetchone(self): 

120 """ Return the first row as a dict (column_name -> value) or None if no rows are available. """ 

121 return self._cursor.dictfetchone() 

122 

123 def dictfetchmany(self, size): 

124 return self._cursor.dictfetchmany(size) 

125 

126 def dictfetchall(self): 

127 return self._cursor.dictfetchall() 

128 

129 def now(self) -> datetime: 

130 """ Return the transaction's timestamp ``datetime.now()``. """ 

131 if self._now is None: 

132 self._now = datetime.now() 

133 return self._now