Coverage for adhoc-cicd-odoo-odoo / odoo / sql_db.py: 66%

484 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1# Part of Odoo. See LICENSE file for full copyright and licensing details. 

2 

3 

4""" 

5The PostgreSQL connector is a connectivity layer between the OpenERP code and 

6the database, *not* a database abstraction toolkit. Database abstraction is what 

7the ORM does, in fact. 

8""" 

9from __future__ import annotations 

10 

11import logging 

12import os 

13import re 

14import threading 

15import time 

16import typing 

17import uuid 

18import warnings 

19from contextlib import contextmanager 

20from datetime import datetime, timedelta 

21from inspect import currentframe 

22 

23import psycopg2 

24import psycopg2.extensions 

25import psycopg2.extras 

26from psycopg2.extensions import ISOLATION_LEVEL_REPEATABLE_READ 

27from psycopg2.pool import PoolError 

28from psycopg2.sql import Composable 

29from werkzeug import urls 

30 

31import odoo 

32 

33from . import tools 

34from .release import MIN_PG_VERSION 

35from .tools import config, SQL 

36from .tools.func import frame_codeinfo, locked 

37from .tools.misc import Callbacks, real_time 

38 

39if typing.TYPE_CHECKING: 

40 from collections.abc import Iterable, Iterator 

41 from odoo.orm.environments import Transaction 

42 

43 T = typing.TypeVar('T') 

44 

45 # when type checking, the BaseCursor exposes methods of the psycopg cursor 

46 _CursorProtocol = psycopg2.extensions.cursor 

47else: 

48 _CursorProtocol = object 

49 

50 

51def undecimalize(value, cr) -> float | None: 

52 if value is None: 

53 return None 

54 return float(value) 

55 

56 

57DECIMAL_TO_FLOAT_TYPE = psycopg2.extensions.new_type((1700,), 'float', undecimalize) 

58psycopg2.extensions.register_type(DECIMAL_TO_FLOAT_TYPE) 

59psycopg2.extensions.register_type(psycopg2.extensions.new_array_type((1231,), 'float[]', DECIMAL_TO_FLOAT_TYPE)) 

60 

61_logger = logging.getLogger(__name__) 

62_logger_conn = _logger.getChild("connection") 

63 

64re_from = re.compile(r'\bfrom\s+"?([a-zA-Z_0-9]+)\b', re.IGNORECASE) 

65re_into = re.compile(r'\binto\s+"?([a-zA-Z_0-9]+)\b', re.IGNORECASE) 

66 

67 

68def categorize_query(decoded_query: str) -> tuple[typing.Literal['from', 'into'], str] | tuple[typing.Literal['other'], None]: 

69 res_into = re_into.search(decoded_query) 

70 # prioritize `insert` over `select` so `select` subqueries are not 

71 # considered when inside a `insert` 

72 if res_into: 

73 return 'into', res_into.group(1) 

74 

75 res_from = re_from.search(decoded_query) 

76 if res_from: 

77 return 'from', res_from.group(1) 

78 

79 return 'other', None 

80 

81 

82sql_counter: int = 0 

83 

84MAX_IDLE_TIMEOUT = 60 * 10 

85 

86 

87class Savepoint: 

88 """ Reifies an active breakpoint, allows :meth:`BaseCursor.savepoint` users 

89 to internally rollback the savepoint (as many times as they want) without 

90 having to implement their own savepointing, or triggering exceptions. 

91 

92 Should normally be created using :meth:`BaseCursor.savepoint` rather than 

93 directly. 

94 

95 The savepoint will be rolled back on unsuccessful context exits 

96 (exceptions). It will be released ("committed") on successful context exit. 

97 The savepoint object can be wrapped in ``contextlib.closing`` to 

98 unconditionally roll it back. 

99 

100 The savepoint can also safely be explicitly closed during context body. This 

101 will rollback by default. 

102 

103 :param BaseCursor cr: the cursor to execute the `SAVEPOINT` queries on 

104 """ 

105 

106 def __init__(self, cr: _CursorProtocol): 

107 self.name = str(uuid.uuid1()) 

108 self._cr = cr 

109 self.closed: bool = False 

110 cr.execute('SAVEPOINT "%s"' % self.name) 

111 

112 def __enter__(self): 

113 return self 

114 

115 def __exit__(self, exc_type, exc_val, exc_tb): 

116 self.close(rollback=exc_type is not None) 

117 

118 def close(self, *, rollback: bool = True): 

119 if not self.closed: 119 ↛ exitline 119 didn't return from function 'close' because the condition on line 119 was always true

120 self._close(rollback) 

121 

122 def rollback(self): 

123 self._cr.execute('ROLLBACK TO SAVEPOINT "%s"' % self.name) 

124 

125 def _close(self, rollback: bool): 

126 if rollback: 

127 self.rollback() 

128 self._cr.execute('RELEASE SAVEPOINT "%s"' % self.name) 

129 self.closed = True 

130 

131 

132class _FlushingSavepoint(Savepoint): 

133 def __init__(self, cr: BaseCursor): 

134 cr.flush() 

135 super().__init__(cr) 

136 

137 def rollback(self): 

138 assert isinstance(self._cr, BaseCursor) 

139 self._cr.clear() 

140 super().rollback() 

141 

142 def _close(self, rollback: bool): 

143 assert isinstance(self._cr, BaseCursor) 

144 try: 

145 if not rollback: 

146 self._cr.flush() 

147 except Exception: 

148 rollback = True 

149 raise 

150 finally: 

151 super()._close(rollback) 

152 

153 

154# _CursorProtocol declares the available methods and type information, 

155# at runtime, it is just an `object` 

156class BaseCursor(_CursorProtocol): 

157 """ Base class for cursors that manage pre/post commit hooks. """ 

158 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit 

159 

160 transaction: Transaction | None 

161 cache: dict[typing.Any, typing.Any] 

162 dbname: str 

163 

164 def __init__(self) -> None: 

165 self.precommit = Callbacks() 

166 self.postcommit = Callbacks() 

167 self.prerollback = Callbacks() 

168 self.postrollback = Callbacks() 

169 self._now: datetime | None = None 

170 self.cache = {} 

171 # By default a cursor has no transaction object. A transaction object 

172 # for managing environments is instantiated by registry.cursor(). It 

173 # is not done here in order to avoid cyclic module dependencies. 

174 self.transaction = None 

175 

176 def flush(self) -> None: 

177 """ Flush the current transaction, and run precommit hooks. """ 

178 # In case some pre-commit added another pre-commit or triggered changes 

179 # in the ORM, we must flush and run it again. 

180 for _ in range(10): # limit number of iterations 180 ↛ 187line 180 didn't jump to line 187 because the loop on line 180 didn't complete

181 if self.transaction is not None: 

182 self.transaction.flush() 

183 if not self.precommit: 

184 break 

185 self.precommit.run() 

186 else: 

187 _logger.warning("Too many iterations for flushing the cursor!") 

188 

189 def clear(self) -> None: 

190 """ Clear the current transaction, and clear precommit hooks. """ 

191 if self.transaction is not None: 

192 self.transaction.clear() 

193 self.precommit.clear() 

194 

195 def reset(self) -> None: 

196 """ Reset the current transaction (this invalidates more that clear()). 

197 This method should be called only right after commit() or rollback(). 

198 """ 

199 if self.transaction is not None: 

200 self.transaction.reset() 

201 

202 def execute(self, query, params=None, log_exceptions: bool = True) -> None: 

203 """ Execute a query inside the current transaction. 

204 """ 

205 raise NotImplementedError 

206 

207 def commit(self) -> None: 

208 """ Commit the current transaction. 

209 """ 

210 raise NotImplementedError 

211 

212 def rollback(self) -> None: 

213 """ Rollback the current transaction. 

214 """ 

215 raise NotImplementedError 

216 

217 def savepoint(self, flush: bool = True) -> Savepoint: 

218 """context manager entering in a new savepoint 

219 

220 With ``flush`` (the default), will automatically run (or clear) the 

221 relevant hooks. 

222 """ 

223 if flush: 

224 return _FlushingSavepoint(self) 

225 else: 

226 return Savepoint(self) 

227 

228 def __enter__(self): 

229 """ Using the cursor as a contextmanager automatically commits and 

230 closes it:: 

231 

232 with cr: 

233 cr.execute(...) 

234 

235 # cr is committed if no failure occurred 

236 # cr is closed in any case 

237 """ 

238 return self 

239 

240 def __exit__(self, exc_type, exc_value, traceback): 

241 try: 

242 if exc_type is None: 242 ↛ 245line 242 didn't jump to line 245 because the condition on line 242 was always true

243 self.commit() 

244 finally: 

245 self.close() 

246 

247 def dictfetchone(self) -> dict[str, typing.Any] | None: 

248 """ Return the first row as a dict (column_name -> value) or None if no rows are available. """ 

249 raise NotImplementedError 

250 

251 def dictfetchmany(self, size: int) -> list[dict[str, typing.Any]]: 

252 res: list[dict[str, typing.Any]] = [] 

253 while size > 0 and (row := self.dictfetchone()) is not None: 

254 res.append(row) 

255 size -= 1 

256 return res 

257 

258 def dictfetchall(self) -> list[dict[str, typing.Any]]: 

259 """ Return all rows as dicts (column_name -> value). """ 

260 res: list[dict[str, typing.Any]] = [] 

261 while (row := self.dictfetchone()) is not None: 

262 res.append(row) 

263 return res 

264 

265 def split_for_in_conditions(self, ids: Iterable[T], size: int = 0) -> Iterator[tuple[T, ...]]: 

266 """Split a list of identifiers into one or more smaller tuples 

267 safe for IN conditions, after uniquifying them.""" 

268 warnings.warn("Deprecated since 19.0, use split_every(cr.IN_MAX, ids)", DeprecationWarning) 

269 return tools.misc.split_every(size or self.IN_MAX, ids) 

270 

271 def now(self) -> datetime: 

272 """ Return the transaction's timestamp ``NOW() AT TIME ZONE 'UTC'``. """ 

273 if self._now is None: 

274 self.execute("SELECT (now() AT TIME ZONE 'UTC')") 

275 row = self.fetchone() 

276 assert row 

277 self._now = row[0] 

278 return self._now 

279 

280 

281class Cursor(BaseCursor): 

282 """Represents an open transaction to the PostgreSQL DB backend, 

283 acting as a lightweight wrapper around psycopg2's 

284 ``cursor`` objects. 

285 

286 ``Cursor`` is the object behind the ``cr`` variable used all 

287 over the OpenERP code. 

288 

289 .. rubric:: Transaction Isolation 

290 

291 One very important property of database transactions is the 

292 level of isolation between concurrent transactions. 

293 The SQL standard defines four levels of transaction isolation, 

294 ranging from the most strict *Serializable* level, to the least 

295 strict *Read Uncommitted* level. These levels are defined in 

296 terms of the phenomena that must not occur between concurrent 

297 transactions, such as *dirty read*, etc. 

298 In the context of a generic business data management software 

299 such as OpenERP, we need the best guarantees that no data 

300 corruption can ever be cause by simply running multiple 

301 transactions in parallel. Therefore, the preferred level would 

302 be the *serializable* level, which ensures that a set of 

303 transactions is guaranteed to produce the same effect as 

304 running them one at a time in some order. 

305 

306 However, most database management systems implement a limited 

307 serializable isolation in the form of 

308 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_, 

309 providing most of the same advantages as True Serializability, 

310 with a fraction of the performance cost. 

311 With PostgreSQL up to version 9.0, this snapshot isolation was 

312 the implementation of both the ``REPEATABLE READ`` and 

313 ``SERIALIZABLE`` levels of the SQL standard. 

314 As of PostgreSQL 9.1, the previous snapshot isolation implementation 

315 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE`` 

316 level was introduced, providing some additional heuristics to 

317 detect a concurrent update by parallel transactions, and forcing 

318 one of them to rollback. 

319 

320 OpenERP implements its own level of locking protection 

321 for transactions that are highly likely to provoke concurrent 

322 updates, such as stock reservations or document sequences updates. 

323 Therefore we mostly care about the properties of snapshot isolation, 

324 but we don't really need additional heuristics to trigger transaction 

325 rollbacks, as we are taking care of triggering instant rollbacks 

326 ourselves when it matters (and we can save the additional performance 

327 hit of these heuristics). 

328 

329 As a result of the above, we have selected ``REPEATABLE READ`` as 

330 the default transaction isolation level for OpenERP cursors, as 

331 it will be mapped to the desired ``snapshot isolation`` level for 

332 all supported PostgreSQL version (>10). 

333 

334 .. attribute:: cache 

335 

336 Cache dictionary with a "request" (-ish) lifecycle, only lives as 

337 long as the cursor itself does and proactively cleared when the 

338 cursor is closed. 

339 

340 This cache should *only* be used to store repeatable reads as it 

341 ignores rollbacks and savepoints, it should not be used to store 

342 *any* data which may be modified during the life of the cursor. 

343 

344 """ 

345 sql_from_log: dict[str, tuple[int, float]] 

346 sql_into_log: dict[str, tuple[int, float]] 

347 sql_log_count: int 

348 

349 def __init__(self, pool: ConnectionPool, dbname: str, dsn: dict): 

350 super().__init__() 

351 self.sql_from_log = {} 

352 self.sql_into_log = {} 

353 

354 # default log level determined at cursor creation, could be 

355 # overridden later for debugging purposes 

356 self.sql_log_count = 0 

357 

358 # avoid the call of close() (by __del__) if an exception 

359 # is raised by any of the following initializations 

360 self._closed: bool = True 

361 

362 self.__pool: ConnectionPool = pool 

363 self.dbname = dbname 

364 

365 self._cnx: PsycoConnection = pool.borrow(dsn) 

366 self._obj: psycopg2.extensions.cursor = self._cnx.cursor() 

367 if _logger.isEnabledFor(logging.DEBUG): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 self.__caller = frame_codeinfo(currentframe(), 2) 

369 else: 

370 self.__caller = False 

371 self._closed = False # real initialization value 

372 # See the docstring of this class. 

373 self.connection.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ) 

374 self.connection.set_session(readonly=pool.readonly) 

375 

376 if os.getenv('ODOO_FAKETIME_TEST_MODE') and self.dbname in tools.config['db_name']: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 self.execute("SET search_path = public, pg_catalog;") 

378 self.commit() # ensure that the search_path remains after a rollback 

379 

380 def __build_dict(self, row: tuple) -> dict[str, typing.Any]: 

381 description = self._obj.description 

382 assert description, "Query does not have results" 

383 return {column.name: row[index] for index, column in enumerate(description)} 

384 

385 def dictfetchone(self) -> dict[str, typing.Any] | None: 

386 row = self._obj.fetchone() 

387 return self.__build_dict(row) if row else None 

388 

389 def dictfetchmany(self, size) -> list[dict[str, typing.Any]]: 

390 return [self.__build_dict(row) for row in self._obj.fetchmany(size)] 

391 

392 def dictfetchall(self) -> list[dict[str, typing.Any]]: 

393 return [self.__build_dict(row) for row in self._obj.fetchall()] 

394 

395 def __del__(self): 

396 if not self._closed and not self._cnx.closed: 396 ↛ 402line 396 didn't jump to line 402 because the condition on line 396 was never true

397 # Oops. 'self' has not been closed explicitly. 

398 # The cursor will be deleted by the garbage collector, 

399 # but the database connection is not put back into the connection 

400 # pool, preventing some operation on the database like dropping it. 

401 # This can also lead to a server overload. 

402 msg = "Cursor not closed explicitly\n" 

403 if self.__caller: 

404 msg += "Cursor was created at %s:%s" % self.__caller 

405 else: 

406 msg += "Please enable sql debugging to trace the caller." 

407 _logger.warning(msg) 

408 self._close(True) 

409 

410 def _format(self, query, params=None) -> str: 

411 encoding = psycopg2.extensions.encodings[self.connection.encoding] 

412 return self.mogrify(query, params).decode(encoding, 'replace') 

413 

414 def mogrify(self, query, params=None) -> bytes: 

415 if isinstance(query, SQL): 415 ↛ 418line 415 didn't jump to line 418 because the condition on line 415 was always true

416 assert params is None, "Unexpected parameters for SQL query object" 

417 query, params = query.code, query.params 

418 return self._obj.mogrify(query, params) 

419 

420 def execute(self, query, params=None, log_exceptions: bool = True) -> None: 

421 global sql_counter 

422 

423 if isinstance(query, SQL): 

424 assert params is None, "Unexpected parameters for SQL query object" 

425 query, params = query.code, query.params 

426 

427 if params and not isinstance(params, (tuple, list, dict)): 427 ↛ 429line 427 didn't jump to line 429 because the condition on line 427 was never true

428 # psycopg2's TypeError is not clear if you mess up the params 

429 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,)) 

430 

431 start = real_time() 

432 try: 

433 self._obj.execute(query, params) 

434 except Exception as e: 

435 if log_exceptions: 

436 _logger.error("bad query: %s\nERROR: %s", self._obj.query or query, e) 

437 raise 

438 finally: 

439 delay = real_time() - start 

440 if _logger.isEnabledFor(logging.DEBUG): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 _logger.debug("[%.3f ms] query: %s", 1000 * delay, self._format(query, params)) 

442 

443 # simple query count is always computed 

444 self.sql_log_count += 1 

445 sql_counter += 1 

446 

447 current_thread = threading.current_thread() 

448 if hasattr(current_thread, 'query_count'): 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true

449 current_thread.query_count += 1 

450 if hasattr(current_thread, 'query_time'): 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true

451 current_thread.query_time += delay 

452 

453 # optional hooks for performance and tracing analysis 

454 for hook in getattr(current_thread, 'query_hooks', ()): 454 ↛ 455line 454 didn't jump to line 455 because the loop on line 454 never started

455 hook(self, query, params, start, delay) 

456 

457 # advanced stats 

458 if _logger.isEnabledFor(logging.DEBUG): 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 if obj_query := self._obj.query: 

460 query = obj_query.decode() 

461 query_type, table = categorize_query(query) 

462 log_target = None 

463 if query_type == 'into': 

464 log_target = self.sql_into_log 

465 elif query_type == 'from': 

466 log_target = self.sql_from_log 

467 if log_target: 

468 stat_count, stat_time = log_target.get(table or '', (0, 0)) 

469 log_target[table or ''] = (stat_count + 1, stat_time + delay * 1E6) 

470 return None 

471 

472 def execute_values(self, query, argslist, template=None, page_size=100, fetch=False): 

473 """ 

474 A proxy for psycopg2.extras.execute_values which can log all queries like execute. 

475 But this method cannot set log_exceptions=False like execute 

476 """ 

477 # Odoo Cursor only proxies all methods of psycopg2 Cursor. This is a patch for problems caused by passing 

478 # self instead of self._obj to the first parameter of psycopg2.extras.execute_values. 

479 if isinstance(query, Composable): 

480 query = query.as_string(self._obj) 

481 return psycopg2.extras.execute_values(self, query, argslist, template=template, page_size=page_size, fetch=fetch) 

482 

483 def print_log(self) -> None: 

484 global sql_counter 

485 

486 if not _logger.isEnabledFor(logging.DEBUG): 486 ↛ 489line 486 didn't jump to line 489 because the condition on line 486 was always true

487 return 

488 

489 def process(log_type: str): 

490 sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log} 

491 sqllog = sqllogs[log_type] 

492 total = 0.0 

493 if sqllog: 

494 _logger.debug("SQL LOG %s:", log_type) 

495 for table, (stat_count, stat_time) in sorted(sqllog.items(), key=lambda k: k[1]): 

496 delay = timedelta(microseconds=stat_time) 

497 _logger.debug("table: %s: %s/%s", table, delay, stat_count) 

498 total += stat_time 

499 sqllog.clear() 

500 total_delay = timedelta(microseconds=total) 

501 _logger.debug("SUM %s:%s/%d [%d]", log_type, total_delay, self.sql_log_count, sql_counter) 

502 

503 process('from') 

504 process('into') 

505 self.sql_log_count = 0 

506 

507 @contextmanager 

508 def _enable_logging(self): 

509 """ Forcefully enables logging for this cursor, restores it afterwards. 

510 

511 Updates the logger in-place, so not thread-safe. 

512 """ 

513 level = _logger.level 

514 _logger.setLevel(logging.DEBUG) 

515 try: 

516 yield 

517 finally: 

518 _logger.setLevel(level) 

519 

520 def close(self) -> None: 

521 if not self.closed: 521 ↛ exitline 521 didn't return from function 'close' because the condition on line 521 was always true

522 return self._close(False) 

523 

524 def _close(self, leak: bool = False) -> None: 

525 if not self._obj: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 return 

527 

528 self.cache.clear() 

529 

530 # advanced stats only at logging.DEBUG level 

531 self.print_log() 

532 

533 self._obj.close() 

534 

535 # This force the cursor to be freed, and thus, available again. It is 

536 # important because otherwise we can overload the server very easily 

537 # because of a cursor shortage (because cursors are not garbage 

538 # collected as fast as they should). The problem is probably due in 

539 # part because browse records keep a reference to the cursor. 

540 del self._obj 

541 

542 # Clean the underlying connection, and run rollback hooks. 

543 self.rollback() 

544 

545 self._closed = True 

546 

547 if leak: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 self._cnx.leaked = True # type: ignore 

549 else: 

550 chosen_template = tools.config['db_template'] 

551 keep_in_pool = self.dbname not in ('template0', 'template1', 'postgres', chosen_template) 

552 self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool) 

553 

554 def commit(self) -> None: 

555 """ Perform an SQL `COMMIT` """ 

556 self.flush() 

557 self._cnx.commit() 

558 self.clear() 

559 self._now = None 

560 self.prerollback.clear() 

561 self.postrollback.clear() 

562 self.postcommit.run() 

563 

564 def rollback(self) -> None: 

565 """ Perform an SQL `ROLLBACK` """ 

566 self.clear() 

567 self.postcommit.clear() 

568 self.prerollback.run() 

569 self._cnx.rollback() 

570 self._now = None 

571 self.postrollback.run() 

572 

573 def __getattr__(self, name): 

574 if self._closed and name == '_obj': 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 raise psycopg2.InterfaceError("Cursor already closed") 

576 return getattr(self._obj, name) 

577 

578 @property 

579 def closed(self) -> bool: 

580 return self._closed or bool(self._cnx.closed) 

581 

582 @property 

583 def readonly(self) -> bool: 

584 return bool(self._cnx.readonly) 

585 

586 

587class PsycoConnection(psycopg2.extensions.connection): 

588 _pool_in_use: bool = False 

589 _pool_last_used: float = 0 

590 

591 def lobject(*args, **kwargs): 

592 pass 

593 

594 if hasattr(psycopg2.extensions, 'ConnectionInfo'): 594 ↛ exitline 594 didn't exit class 'PsycoConnection' because the condition on line 594 was always true

595 @property 

596 def info(self): 

597 class PsycoConnectionInfo(psycopg2.extensions.ConnectionInfo): 

598 @property 

599 def password(self): 

600 pass 

601 return PsycoConnectionInfo(self) 

602 

603 

604class ConnectionPool: 

605 """ The pool of connections to database(s) 

606 

607 Keep a set of connections to pg databases open, and reuse them 

608 to open cursors for all transactions. 

609 

610 The connections are *not* automatically closed. Only a close_db() 

611 can trigger that. 

612 """ 

613 _connections: list[PsycoConnection] 

614 

615 def __init__(self, maxconn: int = 64, readonly: bool = False): 

616 self._connections = [] 

617 self._maxconn = max(maxconn, 1) 

618 self._readonly = readonly 

619 self._lock = threading.Lock() 

620 

621 def __repr__(self): 

622 used = sum(1 for c in self._connections if c._pool_in_use) 

623 count = len(self._connections) 

624 mode = 'read-only' if self._readonly else 'read/write' 

625 return f"ConnectionPool({mode};used={used}/count={count}/max={self._maxconn})" 

626 

627 @property 

628 def readonly(self) -> bool: 

629 return self._readonly 

630 

631 def _debug(self, msg: str, *args): 

632 _logger_conn.debug(('%r ' + msg), self, *args) 

633 

634 @locked 

635 def borrow(self, connection_info: dict) -> PsycoConnection: 

636 """ 

637 Borrow a PsycoConnection from the pool. If no connection is available, create a new one 

638 as long as there are still slots available. Perform some garbage-collection in the pool: 

639 idle, dead and leaked connections are removed. 

640 

641 :param dict connection_info: dict of psql connection keywords 

642 :rtype: PsycoConnection 

643 """ 

644 # free idle, dead and leaked connections 

645 for i, cnx in tools.reverse_enumerate(self._connections): 

646 if not cnx._pool_in_use and not cnx.closed and time.time() - cnx._pool_last_used > MAX_IDLE_TIMEOUT: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 self._debug('Close connection at index %d: %r', i, cnx.dsn) 

648 cnx.close() 

649 if cnx.closed: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true

650 self._connections.pop(i) 

651 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn) 

652 continue 

653 if getattr(cnx, 'leaked', False): 653 ↛ 654line 653 didn't jump to line 654 because the condition on line 653 was never true

654 delattr(cnx, 'leaked') 

655 cnx._pool_in_use = False 

656 _logger.info('%r: Free leaked connection to %r', self, cnx.dsn) 

657 

658 for i, cnx in enumerate(self._connections): 

659 if not cnx._pool_in_use and self._dsn_equals(cnx.dsn, connection_info): 

660 try: 

661 cnx.reset() 

662 except psycopg2.OperationalError: 

663 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn) 

664 # psycopg2 2.4.4 and earlier do not allow closing a closed connection 

665 if not cnx.closed: 

666 cnx.close() 

667 continue 

668 cnx._pool_in_use = True 

669 self._debug('Borrow existing connection to %r at index %d', cnx.dsn, i) 

670 

671 return cnx 

672 

673 if len(self._connections) >= self._maxconn: 673 ↛ 675line 673 didn't jump to line 675 because the condition on line 673 was never true

674 # try to remove the oldest connection not used 

675 for i, cnx in enumerate(self._connections): 

676 if not cnx._pool_in_use: 

677 self._connections.pop(i) 

678 if not cnx.closed: 

679 cnx.close() 

680 self._debug('Removing old connection at index %d: %r', i, cnx.dsn) 

681 break 

682 else: 

683 # note: this code is called only if the for loop has completed (no break) 

684 raise PoolError('The Connection Pool Is Full') 

685 

686 try: 

687 result = psycopg2.connect( 

688 connection_factory=PsycoConnection, 

689 **connection_info) 

690 except psycopg2.Error: 

691 _logger.info('Connection to the database failed') 

692 raise 

693 if result.server_version < MIN_PG_VERSION * 10000: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true

694 warnings.warn(f"Postgres version is {result.server_version}, lower than minimum required {MIN_PG_VERSION * 10000}") 

695 result._pool_in_use = True 

696 self._connections.append(result) 

697 self._debug('Create new connection backend PID %d', result.get_backend_pid()) 

698 

699 return result 

700 

701 @locked 

702 def give_back(self, connection: PsycoConnection, keep_in_pool: bool = True): 

703 self._debug('Give back connection to %r', connection.dsn) 

704 try: 

705 index = self._connections.index(connection) 

706 except ValueError: 

707 raise PoolError('This connection does not belong to the pool') 

708 

709 if keep_in_pool: 

710 # Release the connection and record the last time used 

711 connection._pool_in_use = False 

712 connection._pool_last_used = time.time() 

713 self._debug('Put connection to %r in pool', connection.dsn) 

714 else: 

715 cnx = self._connections.pop(index) 

716 self._debug('Forgot connection to %r', cnx.dsn) 

717 cnx.close() 

718 

719 @locked 

720 def close_all(self, dsn: dict | str | None = None): 

721 count = 0 

722 last = None 

723 for i, cnx in tools.reverse_enumerate(self._connections): 

724 if dsn is None or self._dsn_equals(cnx.dsn, dsn): 724 ↛ 723line 724 didn't jump to line 723 because the condition on line 724 was always true

725 cnx.close() 

726 last = self._connections.pop(i) 

727 count += 1 

728 if count: 

729 _logger.info('%r: Closed %d connections %s', self, count, 

730 (dsn and last and 'to %r' % last.dsn) or '') 

731 

732 def _dsn_equals(self, dsn1: dict | str, dsn2: dict | str) -> bool: 

733 alias_keys = {'dbname': 'database'} 

734 ignore_keys = ['password'] 

735 dsn1, dsn2 = ({ 

736 alias_keys.get(key, key): str(value) 

737 for key, value in (psycopg2.extensions.parse_dsn(dsn) if isinstance(dsn, str) else dsn).items() 

738 if key not in ignore_keys 

739 } for dsn in (dsn1, dsn2)) 

740 return dsn1 == dsn2 

741 

742 

743class Connection: 

744 """ A lightweight instance of a connection to postgres 

745 """ 

746 def __init__(self, pool: ConnectionPool, dbname: str, dsn: dict): 

747 self.__dbname = dbname 

748 self.__dsn = dsn 

749 self.__pool = pool 

750 

751 @property 

752 def dsn(self) -> dict: 

753 dsn = dict(self.__dsn) 

754 dsn.pop('password', None) 

755 return dsn 

756 

757 @property 

758 def dbname(self) -> str: 

759 return self.__dbname 

760 

761 def cursor(self) -> Cursor: 

762 _logger.debug('create cursor to %r', self.dsn) 

763 return Cursor(self.__pool, self.__dbname, self.__dsn) 

764 

765 def __bool__(self): 

766 raise NotImplementedError() 

767 

768 

769def connection_info_for(db_or_uri: str, readonly=False) -> tuple[str, dict]: 

770 """ parse the given `db_or_uri` and return a 2-tuple (dbname, connection_params) 

771 

772 Connection params are either a dictionary with a single key ``dsn`` 

773 containing a connection URI, or a dictionary containing connection 

774 parameter keywords which psycopg2 can build a key/value connection string 

775 (dsn) from 

776 

777 :param str db_or_uri: database name or postgres dsn 

778 :param bool readonly: used to load 

779 the default configuration from ``db_`` or ``db_replica_``. 

780 :rtype: (str, dict) 

781 """ 

782 app_name = config['db_app_name'] 

783 if 'ODOO_PGAPPNAME' in os.environ: 783 ↛ 784line 783 didn't jump to line 784 because the condition on line 783 was never true

784 warnings.warn("Since 19.0, use PGAPPNAME instead of ODOO_PGAPPNAME", DeprecationWarning) 

785 app_name = os.environ['ODOO_PGAPPNAME'] 

786 # Using manual string interpolation for security reason and trimming at default NAMEDATALEN=63 

787 app_name = app_name.replace('{pid}', str(os.getpid()))[:63] 

788 if db_or_uri.startswith(('postgresql://', 'postgres://')): 788 ↛ 790line 788 didn't jump to line 790 because the condition on line 788 was never true

789 # extract db from uri 

790 us = urls.url_parse(db_or_uri) # type: ignore 

791 if len(us.path) > 1: 

792 db_name = us.path[1:] 

793 elif us.username: 

794 db_name = us.username 

795 else: 

796 db_name = us.hostname 

797 return db_name, {'dsn': db_or_uri, 'application_name': app_name} 

798 

799 connection_info = {'database': db_or_uri, 'application_name': app_name} 

800 for p in ('host', 'port', 'user', 'password', 'sslmode'): 

801 cfg = tools.config['db_' + p] 

802 if readonly: 

803 cfg = tools.config.get('db_replica_' + p) or cfg 

804 if cfg: 

805 connection_info[p] = cfg 

806 

807 return db_or_uri, connection_info 

808 

809 

810_Pool: ConnectionPool | None = None 

811_Pool_readonly: ConnectionPool | None = None 

812 

813 

814def db_connect(to: str, allow_uri=False, readonly=False) -> Connection: 

815 global _Pool, _Pool_readonly # noqa: PLW0603 (global-statement) 

816 

817 maxconn = (tools.config['db_maxconn_gevent'] if hasattr(odoo, 'evented') and odoo.evented else 0) or tools.config['db_maxconn'] 

818 _Pool_readonly if readonly else _Pool 

819 if readonly: 

820 if _Pool_readonly is None: 820 ↛ 822line 820 didn't jump to line 822 because the condition on line 820 was always true

821 _Pool_readonly = ConnectionPool(int(maxconn), readonly=True) 

822 pool = _Pool_readonly 

823 else: 

824 if _Pool is None: 

825 _Pool = ConnectionPool(int(maxconn), readonly=False) 

826 pool = _Pool 

827 

828 db, info = connection_info_for(to, readonly) 

829 if not allow_uri and db != to: 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true

830 raise ValueError('URI connections not allowed') 

831 return Connection(pool, db, info) 

832 

833 

834def close_db(db_name: str) -> None: 

835 """ You might want to call odoo.modules.registry.Registry.delete(db_name) along this function.""" 

836 if _Pool: 

837 _Pool.close_all(connection_info_for(db_name)[1]) 

838 if _Pool_readonly: 

839 _Pool_readonly.close_all(connection_info_for(db_name)[1]) 

840 

841 

842def close_all() -> None: 

843 if _Pool: 843 ↛ 845line 843 didn't jump to line 845 because the condition on line 843 was always true

844 _Pool.close_all() 

845 if _Pool_readonly: 845 ↛ exitline 845 didn't return from function 'close_all' because the condition on line 845 was always true

846 _Pool_readonly.close_all()