Coverage for adhoc-cicd-odoo-odoo / odoo / sql_db.py: 66%
484 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1# Part of Odoo. See LICENSE file for full copyright and licensing details.
4"""
5The PostgreSQL connector is a connectivity layer between the OpenERP code and
6the database, *not* a database abstraction toolkit. Database abstraction is what
7the ORM does, in fact.
8"""
9from __future__ import annotations
11import logging
12import os
13import re
14import threading
15import time
16import typing
17import uuid
18import warnings
19from contextlib import contextmanager
20from datetime import datetime, timedelta
21from inspect import currentframe
23import psycopg2
24import psycopg2.extensions
25import psycopg2.extras
26from psycopg2.extensions import ISOLATION_LEVEL_REPEATABLE_READ
27from psycopg2.pool import PoolError
28from psycopg2.sql import Composable
29from werkzeug import urls
31import odoo
33from . import tools
34from .release import MIN_PG_VERSION
35from .tools import config, SQL
36from .tools.func import frame_codeinfo, locked
37from .tools.misc import Callbacks, real_time
39if typing.TYPE_CHECKING:
40 from collections.abc import Iterable, Iterator
41 from odoo.orm.environments import Transaction
43 T = typing.TypeVar('T')
45 # when type checking, the BaseCursor exposes methods of the psycopg cursor
46 _CursorProtocol = psycopg2.extensions.cursor
47else:
48 _CursorProtocol = object
51def undecimalize(value, cr) -> float | None:
52 if value is None:
53 return None
54 return float(value)
57DECIMAL_TO_FLOAT_TYPE = psycopg2.extensions.new_type((1700,), 'float', undecimalize)
58psycopg2.extensions.register_type(DECIMAL_TO_FLOAT_TYPE)
59psycopg2.extensions.register_type(psycopg2.extensions.new_array_type((1231,), 'float[]', DECIMAL_TO_FLOAT_TYPE))
61_logger = logging.getLogger(__name__)
62_logger_conn = _logger.getChild("connection")
64re_from = re.compile(r'\bfrom\s+"?([a-zA-Z_0-9]+)\b', re.IGNORECASE)
65re_into = re.compile(r'\binto\s+"?([a-zA-Z_0-9]+)\b', re.IGNORECASE)
68def categorize_query(decoded_query: str) -> tuple[typing.Literal['from', 'into'], str] | tuple[typing.Literal['other'], None]:
69 res_into = re_into.search(decoded_query)
70 # prioritize `insert` over `select` so `select` subqueries are not
71 # considered when inside a `insert`
72 if res_into:
73 return 'into', res_into.group(1)
75 res_from = re_from.search(decoded_query)
76 if res_from:
77 return 'from', res_from.group(1)
79 return 'other', None
82sql_counter: int = 0
84MAX_IDLE_TIMEOUT = 60 * 10
87class Savepoint:
88 """ Reifies an active breakpoint, allows :meth:`BaseCursor.savepoint` users
89 to internally rollback the savepoint (as many times as they want) without
90 having to implement their own savepointing, or triggering exceptions.
92 Should normally be created using :meth:`BaseCursor.savepoint` rather than
93 directly.
95 The savepoint will be rolled back on unsuccessful context exits
96 (exceptions). It will be released ("committed") on successful context exit.
97 The savepoint object can be wrapped in ``contextlib.closing`` to
98 unconditionally roll it back.
100 The savepoint can also safely be explicitly closed during context body. This
101 will rollback by default.
103 :param BaseCursor cr: the cursor to execute the `SAVEPOINT` queries on
104 """
106 def __init__(self, cr: _CursorProtocol):
107 self.name = str(uuid.uuid1())
108 self._cr = cr
109 self.closed: bool = False
110 cr.execute('SAVEPOINT "%s"' % self.name)
112 def __enter__(self):
113 return self
115 def __exit__(self, exc_type, exc_val, exc_tb):
116 self.close(rollback=exc_type is not None)
118 def close(self, *, rollback: bool = True):
119 if not self.closed: 119 ↛ exitline 119 didn't return from function 'close' because the condition on line 119 was always true
120 self._close(rollback)
122 def rollback(self):
123 self._cr.execute('ROLLBACK TO SAVEPOINT "%s"' % self.name)
125 def _close(self, rollback: bool):
126 if rollback:
127 self.rollback()
128 self._cr.execute('RELEASE SAVEPOINT "%s"' % self.name)
129 self.closed = True
132class _FlushingSavepoint(Savepoint):
133 def __init__(self, cr: BaseCursor):
134 cr.flush()
135 super().__init__(cr)
137 def rollback(self):
138 assert isinstance(self._cr, BaseCursor)
139 self._cr.clear()
140 super().rollback()
142 def _close(self, rollback: bool):
143 assert isinstance(self._cr, BaseCursor)
144 try:
145 if not rollback:
146 self._cr.flush()
147 except Exception:
148 rollback = True
149 raise
150 finally:
151 super()._close(rollback)
154# _CursorProtocol declares the available methods and type information,
155# at runtime, it is just an `object`
156class BaseCursor(_CursorProtocol):
157 """ Base class for cursors that manage pre/post commit hooks. """
158 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
160 transaction: Transaction | None
161 cache: dict[typing.Any, typing.Any]
162 dbname: str
164 def __init__(self) -> None:
165 self.precommit = Callbacks()
166 self.postcommit = Callbacks()
167 self.prerollback = Callbacks()
168 self.postrollback = Callbacks()
169 self._now: datetime | None = None
170 self.cache = {}
171 # By default a cursor has no transaction object. A transaction object
172 # for managing environments is instantiated by registry.cursor(). It
173 # is not done here in order to avoid cyclic module dependencies.
174 self.transaction = None
176 def flush(self) -> None:
177 """ Flush the current transaction, and run precommit hooks. """
178 # In case some pre-commit added another pre-commit or triggered changes
179 # in the ORM, we must flush and run it again.
180 for _ in range(10): # limit number of iterations 180 ↛ 187line 180 didn't jump to line 187 because the loop on line 180 didn't complete
181 if self.transaction is not None:
182 self.transaction.flush()
183 if not self.precommit:
184 break
185 self.precommit.run()
186 else:
187 _logger.warning("Too many iterations for flushing the cursor!")
189 def clear(self) -> None:
190 """ Clear the current transaction, and clear precommit hooks. """
191 if self.transaction is not None:
192 self.transaction.clear()
193 self.precommit.clear()
195 def reset(self) -> None:
196 """ Reset the current transaction (this invalidates more that clear()).
197 This method should be called only right after commit() or rollback().
198 """
199 if self.transaction is not None:
200 self.transaction.reset()
202 def execute(self, query, params=None, log_exceptions: bool = True) -> None:
203 """ Execute a query inside the current transaction.
204 """
205 raise NotImplementedError
207 def commit(self) -> None:
208 """ Commit the current transaction.
209 """
210 raise NotImplementedError
212 def rollback(self) -> None:
213 """ Rollback the current transaction.
214 """
215 raise NotImplementedError
217 def savepoint(self, flush: bool = True) -> Savepoint:
218 """context manager entering in a new savepoint
220 With ``flush`` (the default), will automatically run (or clear) the
221 relevant hooks.
222 """
223 if flush:
224 return _FlushingSavepoint(self)
225 else:
226 return Savepoint(self)
228 def __enter__(self):
229 """ Using the cursor as a contextmanager automatically commits and
230 closes it::
232 with cr:
233 cr.execute(...)
235 # cr is committed if no failure occurred
236 # cr is closed in any case
237 """
238 return self
240 def __exit__(self, exc_type, exc_value, traceback):
241 try:
242 if exc_type is None: 242 ↛ 245line 242 didn't jump to line 245 because the condition on line 242 was always true
243 self.commit()
244 finally:
245 self.close()
247 def dictfetchone(self) -> dict[str, typing.Any] | None:
248 """ Return the first row as a dict (column_name -> value) or None if no rows are available. """
249 raise NotImplementedError
251 def dictfetchmany(self, size: int) -> list[dict[str, typing.Any]]:
252 res: list[dict[str, typing.Any]] = []
253 while size > 0 and (row := self.dictfetchone()) is not None:
254 res.append(row)
255 size -= 1
256 return res
258 def dictfetchall(self) -> list[dict[str, typing.Any]]:
259 """ Return all rows as dicts (column_name -> value). """
260 res: list[dict[str, typing.Any]] = []
261 while (row := self.dictfetchone()) is not None:
262 res.append(row)
263 return res
265 def split_for_in_conditions(self, ids: Iterable[T], size: int = 0) -> Iterator[tuple[T, ...]]:
266 """Split a list of identifiers into one or more smaller tuples
267 safe for IN conditions, after uniquifying them."""
268 warnings.warn("Deprecated since 19.0, use split_every(cr.IN_MAX, ids)", DeprecationWarning)
269 return tools.misc.split_every(size or self.IN_MAX, ids)
271 def now(self) -> datetime:
272 """ Return the transaction's timestamp ``NOW() AT TIME ZONE 'UTC'``. """
273 if self._now is None:
274 self.execute("SELECT (now() AT TIME ZONE 'UTC')")
275 row = self.fetchone()
276 assert row
277 self._now = row[0]
278 return self._now
281class Cursor(BaseCursor):
282 """Represents an open transaction to the PostgreSQL DB backend,
283 acting as a lightweight wrapper around psycopg2's
284 ``cursor`` objects.
286 ``Cursor`` is the object behind the ``cr`` variable used all
287 over the OpenERP code.
289 .. rubric:: Transaction Isolation
291 One very important property of database transactions is the
292 level of isolation between concurrent transactions.
293 The SQL standard defines four levels of transaction isolation,
294 ranging from the most strict *Serializable* level, to the least
295 strict *Read Uncommitted* level. These levels are defined in
296 terms of the phenomena that must not occur between concurrent
297 transactions, such as *dirty read*, etc.
298 In the context of a generic business data management software
299 such as OpenERP, we need the best guarantees that no data
300 corruption can ever be cause by simply running multiple
301 transactions in parallel. Therefore, the preferred level would
302 be the *serializable* level, which ensures that a set of
303 transactions is guaranteed to produce the same effect as
304 running them one at a time in some order.
306 However, most database management systems implement a limited
307 serializable isolation in the form of
308 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
309 providing most of the same advantages as True Serializability,
310 with a fraction of the performance cost.
311 With PostgreSQL up to version 9.0, this snapshot isolation was
312 the implementation of both the ``REPEATABLE READ`` and
313 ``SERIALIZABLE`` levels of the SQL standard.
314 As of PostgreSQL 9.1, the previous snapshot isolation implementation
315 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
316 level was introduced, providing some additional heuristics to
317 detect a concurrent update by parallel transactions, and forcing
318 one of them to rollback.
320 OpenERP implements its own level of locking protection
321 for transactions that are highly likely to provoke concurrent
322 updates, such as stock reservations or document sequences updates.
323 Therefore we mostly care about the properties of snapshot isolation,
324 but we don't really need additional heuristics to trigger transaction
325 rollbacks, as we are taking care of triggering instant rollbacks
326 ourselves when it matters (and we can save the additional performance
327 hit of these heuristics).
329 As a result of the above, we have selected ``REPEATABLE READ`` as
330 the default transaction isolation level for OpenERP cursors, as
331 it will be mapped to the desired ``snapshot isolation`` level for
332 all supported PostgreSQL version (>10).
334 .. attribute:: cache
336 Cache dictionary with a "request" (-ish) lifecycle, only lives as
337 long as the cursor itself does and proactively cleared when the
338 cursor is closed.
340 This cache should *only* be used to store repeatable reads as it
341 ignores rollbacks and savepoints, it should not be used to store
342 *any* data which may be modified during the life of the cursor.
344 """
345 sql_from_log: dict[str, tuple[int, float]]
346 sql_into_log: dict[str, tuple[int, float]]
347 sql_log_count: int
349 def __init__(self, pool: ConnectionPool, dbname: str, dsn: dict):
350 super().__init__()
351 self.sql_from_log = {}
352 self.sql_into_log = {}
354 # default log level determined at cursor creation, could be
355 # overridden later for debugging purposes
356 self.sql_log_count = 0
358 # avoid the call of close() (by __del__) if an exception
359 # is raised by any of the following initializations
360 self._closed: bool = True
362 self.__pool: ConnectionPool = pool
363 self.dbname = dbname
365 self._cnx: PsycoConnection = pool.borrow(dsn)
366 self._obj: psycopg2.extensions.cursor = self._cnx.cursor()
367 if _logger.isEnabledFor(logging.DEBUG): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true
368 self.__caller = frame_codeinfo(currentframe(), 2)
369 else:
370 self.__caller = False
371 self._closed = False # real initialization value
372 # See the docstring of this class.
373 self.connection.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
374 self.connection.set_session(readonly=pool.readonly)
376 if os.getenv('ODOO_FAKETIME_TEST_MODE') and self.dbname in tools.config['db_name']: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 self.execute("SET search_path = public, pg_catalog;")
378 self.commit() # ensure that the search_path remains after a rollback
380 def __build_dict(self, row: tuple) -> dict[str, typing.Any]:
381 description = self._obj.description
382 assert description, "Query does not have results"
383 return {column.name: row[index] for index, column in enumerate(description)}
385 def dictfetchone(self) -> dict[str, typing.Any] | None:
386 row = self._obj.fetchone()
387 return self.__build_dict(row) if row else None
389 def dictfetchmany(self, size) -> list[dict[str, typing.Any]]:
390 return [self.__build_dict(row) for row in self._obj.fetchmany(size)]
392 def dictfetchall(self) -> list[dict[str, typing.Any]]:
393 return [self.__build_dict(row) for row in self._obj.fetchall()]
395 def __del__(self):
396 if not self._closed and not self._cnx.closed: 396 ↛ 402line 396 didn't jump to line 402 because the condition on line 396 was never true
397 # Oops. 'self' has not been closed explicitly.
398 # The cursor will be deleted by the garbage collector,
399 # but the database connection is not put back into the connection
400 # pool, preventing some operation on the database like dropping it.
401 # This can also lead to a server overload.
402 msg = "Cursor not closed explicitly\n"
403 if self.__caller:
404 msg += "Cursor was created at %s:%s" % self.__caller
405 else:
406 msg += "Please enable sql debugging to trace the caller."
407 _logger.warning(msg)
408 self._close(True)
410 def _format(self, query, params=None) -> str:
411 encoding = psycopg2.extensions.encodings[self.connection.encoding]
412 return self.mogrify(query, params).decode(encoding, 'replace')
414 def mogrify(self, query, params=None) -> bytes:
415 if isinstance(query, SQL): 415 ↛ 418line 415 didn't jump to line 418 because the condition on line 415 was always true
416 assert params is None, "Unexpected parameters for SQL query object"
417 query, params = query.code, query.params
418 return self._obj.mogrify(query, params)
420 def execute(self, query, params=None, log_exceptions: bool = True) -> None:
421 global sql_counter
423 if isinstance(query, SQL):
424 assert params is None, "Unexpected parameters for SQL query object"
425 query, params = query.code, query.params
427 if params and not isinstance(params, (tuple, list, dict)): 427 ↛ 429line 427 didn't jump to line 429 because the condition on line 427 was never true
428 # psycopg2's TypeError is not clear if you mess up the params
429 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
431 start = real_time()
432 try:
433 self._obj.execute(query, params)
434 except Exception as e:
435 if log_exceptions:
436 _logger.error("bad query: %s\nERROR: %s", self._obj.query or query, e)
437 raise
438 finally:
439 delay = real_time() - start
440 if _logger.isEnabledFor(logging.DEBUG): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 _logger.debug("[%.3f ms] query: %s", 1000 * delay, self._format(query, params))
443 # simple query count is always computed
444 self.sql_log_count += 1
445 sql_counter += 1
447 current_thread = threading.current_thread()
448 if hasattr(current_thread, 'query_count'): 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 current_thread.query_count += 1
450 if hasattr(current_thread, 'query_time'): 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true
451 current_thread.query_time += delay
453 # optional hooks for performance and tracing analysis
454 for hook in getattr(current_thread, 'query_hooks', ()): 454 ↛ 455line 454 didn't jump to line 455 because the loop on line 454 never started
455 hook(self, query, params, start, delay)
457 # advanced stats
458 if _logger.isEnabledFor(logging.DEBUG): 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 if obj_query := self._obj.query:
460 query = obj_query.decode()
461 query_type, table = categorize_query(query)
462 log_target = None
463 if query_type == 'into':
464 log_target = self.sql_into_log
465 elif query_type == 'from':
466 log_target = self.sql_from_log
467 if log_target:
468 stat_count, stat_time = log_target.get(table or '', (0, 0))
469 log_target[table or ''] = (stat_count + 1, stat_time + delay * 1E6)
470 return None
472 def execute_values(self, query, argslist, template=None, page_size=100, fetch=False):
473 """
474 A proxy for psycopg2.extras.execute_values which can log all queries like execute.
475 But this method cannot set log_exceptions=False like execute
476 """
477 # Odoo Cursor only proxies all methods of psycopg2 Cursor. This is a patch for problems caused by passing
478 # self instead of self._obj to the first parameter of psycopg2.extras.execute_values.
479 if isinstance(query, Composable):
480 query = query.as_string(self._obj)
481 return psycopg2.extras.execute_values(self, query, argslist, template=template, page_size=page_size, fetch=fetch)
483 def print_log(self) -> None:
484 global sql_counter
486 if not _logger.isEnabledFor(logging.DEBUG): 486 ↛ 489line 486 didn't jump to line 489 because the condition on line 486 was always true
487 return
489 def process(log_type: str):
490 sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log}
491 sqllog = sqllogs[log_type]
492 total = 0.0
493 if sqllog:
494 _logger.debug("SQL LOG %s:", log_type)
495 for table, (stat_count, stat_time) in sorted(sqllog.items(), key=lambda k: k[1]):
496 delay = timedelta(microseconds=stat_time)
497 _logger.debug("table: %s: %s/%s", table, delay, stat_count)
498 total += stat_time
499 sqllog.clear()
500 total_delay = timedelta(microseconds=total)
501 _logger.debug("SUM %s:%s/%d [%d]", log_type, total_delay, self.sql_log_count, sql_counter)
503 process('from')
504 process('into')
505 self.sql_log_count = 0
507 @contextmanager
508 def _enable_logging(self):
509 """ Forcefully enables logging for this cursor, restores it afterwards.
511 Updates the logger in-place, so not thread-safe.
512 """
513 level = _logger.level
514 _logger.setLevel(logging.DEBUG)
515 try:
516 yield
517 finally:
518 _logger.setLevel(level)
520 def close(self) -> None:
521 if not self.closed: 521 ↛ exitline 521 didn't return from function 'close' because the condition on line 521 was always true
522 return self._close(False)
524 def _close(self, leak: bool = False) -> None:
525 if not self._obj: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 return
528 self.cache.clear()
530 # advanced stats only at logging.DEBUG level
531 self.print_log()
533 self._obj.close()
535 # This force the cursor to be freed, and thus, available again. It is
536 # important because otherwise we can overload the server very easily
537 # because of a cursor shortage (because cursors are not garbage
538 # collected as fast as they should). The problem is probably due in
539 # part because browse records keep a reference to the cursor.
540 del self._obj
542 # Clean the underlying connection, and run rollback hooks.
543 self.rollback()
545 self._closed = True
547 if leak: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 self._cnx.leaked = True # type: ignore
549 else:
550 chosen_template = tools.config['db_template']
551 keep_in_pool = self.dbname not in ('template0', 'template1', 'postgres', chosen_template)
552 self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
554 def commit(self) -> None:
555 """ Perform an SQL `COMMIT` """
556 self.flush()
557 self._cnx.commit()
558 self.clear()
559 self._now = None
560 self.prerollback.clear()
561 self.postrollback.clear()
562 self.postcommit.run()
564 def rollback(self) -> None:
565 """ Perform an SQL `ROLLBACK` """
566 self.clear()
567 self.postcommit.clear()
568 self.prerollback.run()
569 self._cnx.rollback()
570 self._now = None
571 self.postrollback.run()
573 def __getattr__(self, name):
574 if self._closed and name == '_obj': 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 raise psycopg2.InterfaceError("Cursor already closed")
576 return getattr(self._obj, name)
578 @property
579 def closed(self) -> bool:
580 return self._closed or bool(self._cnx.closed)
582 @property
583 def readonly(self) -> bool:
584 return bool(self._cnx.readonly)
587class PsycoConnection(psycopg2.extensions.connection):
588 _pool_in_use: bool = False
589 _pool_last_used: float = 0
591 def lobject(*args, **kwargs):
592 pass
594 if hasattr(psycopg2.extensions, 'ConnectionInfo'): 594 ↛ exitline 594 didn't exit class 'PsycoConnection' because the condition on line 594 was always true
595 @property
596 def info(self):
597 class PsycoConnectionInfo(psycopg2.extensions.ConnectionInfo):
598 @property
599 def password(self):
600 pass
601 return PsycoConnectionInfo(self)
604class ConnectionPool:
605 """ The pool of connections to database(s)
607 Keep a set of connections to pg databases open, and reuse them
608 to open cursors for all transactions.
610 The connections are *not* automatically closed. Only a close_db()
611 can trigger that.
612 """
613 _connections: list[PsycoConnection]
615 def __init__(self, maxconn: int = 64, readonly: bool = False):
616 self._connections = []
617 self._maxconn = max(maxconn, 1)
618 self._readonly = readonly
619 self._lock = threading.Lock()
621 def __repr__(self):
622 used = sum(1 for c in self._connections if c._pool_in_use)
623 count = len(self._connections)
624 mode = 'read-only' if self._readonly else 'read/write'
625 return f"ConnectionPool({mode};used={used}/count={count}/max={self._maxconn})"
627 @property
628 def readonly(self) -> bool:
629 return self._readonly
631 def _debug(self, msg: str, *args):
632 _logger_conn.debug(('%r ' + msg), self, *args)
634 @locked
635 def borrow(self, connection_info: dict) -> PsycoConnection:
636 """
637 Borrow a PsycoConnection from the pool. If no connection is available, create a new one
638 as long as there are still slots available. Perform some garbage-collection in the pool:
639 idle, dead and leaked connections are removed.
641 :param dict connection_info: dict of psql connection keywords
642 :rtype: PsycoConnection
643 """
644 # free idle, dead and leaked connections
645 for i, cnx in tools.reverse_enumerate(self._connections):
646 if not cnx._pool_in_use and not cnx.closed and time.time() - cnx._pool_last_used > MAX_IDLE_TIMEOUT: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 self._debug('Close connection at index %d: %r', i, cnx.dsn)
648 cnx.close()
649 if cnx.closed: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true
650 self._connections.pop(i)
651 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
652 continue
653 if getattr(cnx, 'leaked', False): 653 ↛ 654line 653 didn't jump to line 654 because the condition on line 653 was never true
654 delattr(cnx, 'leaked')
655 cnx._pool_in_use = False
656 _logger.info('%r: Free leaked connection to %r', self, cnx.dsn)
658 for i, cnx in enumerate(self._connections):
659 if not cnx._pool_in_use and self._dsn_equals(cnx.dsn, connection_info):
660 try:
661 cnx.reset()
662 except psycopg2.OperationalError:
663 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
664 # psycopg2 2.4.4 and earlier do not allow closing a closed connection
665 if not cnx.closed:
666 cnx.close()
667 continue
668 cnx._pool_in_use = True
669 self._debug('Borrow existing connection to %r at index %d', cnx.dsn, i)
671 return cnx
673 if len(self._connections) >= self._maxconn: 673 ↛ 675line 673 didn't jump to line 675 because the condition on line 673 was never true
674 # try to remove the oldest connection not used
675 for i, cnx in enumerate(self._connections):
676 if not cnx._pool_in_use:
677 self._connections.pop(i)
678 if not cnx.closed:
679 cnx.close()
680 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
681 break
682 else:
683 # note: this code is called only if the for loop has completed (no break)
684 raise PoolError('The Connection Pool Is Full')
686 try:
687 result = psycopg2.connect(
688 connection_factory=PsycoConnection,
689 **connection_info)
690 except psycopg2.Error:
691 _logger.info('Connection to the database failed')
692 raise
693 if result.server_version < MIN_PG_VERSION * 10000: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true
694 warnings.warn(f"Postgres version is {result.server_version}, lower than minimum required {MIN_PG_VERSION * 10000}")
695 result._pool_in_use = True
696 self._connections.append(result)
697 self._debug('Create new connection backend PID %d', result.get_backend_pid())
699 return result
701 @locked
702 def give_back(self, connection: PsycoConnection, keep_in_pool: bool = True):
703 self._debug('Give back connection to %r', connection.dsn)
704 try:
705 index = self._connections.index(connection)
706 except ValueError:
707 raise PoolError('This connection does not belong to the pool')
709 if keep_in_pool:
710 # Release the connection and record the last time used
711 connection._pool_in_use = False
712 connection._pool_last_used = time.time()
713 self._debug('Put connection to %r in pool', connection.dsn)
714 else:
715 cnx = self._connections.pop(index)
716 self._debug('Forgot connection to %r', cnx.dsn)
717 cnx.close()
719 @locked
720 def close_all(self, dsn: dict | str | None = None):
721 count = 0
722 last = None
723 for i, cnx in tools.reverse_enumerate(self._connections):
724 if dsn is None or self._dsn_equals(cnx.dsn, dsn): 724 ↛ 723line 724 didn't jump to line 723 because the condition on line 724 was always true
725 cnx.close()
726 last = self._connections.pop(i)
727 count += 1
728 if count:
729 _logger.info('%r: Closed %d connections %s', self, count,
730 (dsn and last and 'to %r' % last.dsn) or '')
732 def _dsn_equals(self, dsn1: dict | str, dsn2: dict | str) -> bool:
733 alias_keys = {'dbname': 'database'}
734 ignore_keys = ['password']
735 dsn1, dsn2 = ({
736 alias_keys.get(key, key): str(value)
737 for key, value in (psycopg2.extensions.parse_dsn(dsn) if isinstance(dsn, str) else dsn).items()
738 if key not in ignore_keys
739 } for dsn in (dsn1, dsn2))
740 return dsn1 == dsn2
743class Connection:
744 """ A lightweight instance of a connection to postgres
745 """
746 def __init__(self, pool: ConnectionPool, dbname: str, dsn: dict):
747 self.__dbname = dbname
748 self.__dsn = dsn
749 self.__pool = pool
751 @property
752 def dsn(self) -> dict:
753 dsn = dict(self.__dsn)
754 dsn.pop('password', None)
755 return dsn
757 @property
758 def dbname(self) -> str:
759 return self.__dbname
761 def cursor(self) -> Cursor:
762 _logger.debug('create cursor to %r', self.dsn)
763 return Cursor(self.__pool, self.__dbname, self.__dsn)
765 def __bool__(self):
766 raise NotImplementedError()
769def connection_info_for(db_or_uri: str, readonly=False) -> tuple[str, dict]:
770 """ parse the given `db_or_uri` and return a 2-tuple (dbname, connection_params)
772 Connection params are either a dictionary with a single key ``dsn``
773 containing a connection URI, or a dictionary containing connection
774 parameter keywords which psycopg2 can build a key/value connection string
775 (dsn) from
777 :param str db_or_uri: database name or postgres dsn
778 :param bool readonly: used to load
779 the default configuration from ``db_`` or ``db_replica_``.
780 :rtype: (str, dict)
781 """
782 app_name = config['db_app_name']
783 if 'ODOO_PGAPPNAME' in os.environ: 783 ↛ 784line 783 didn't jump to line 784 because the condition on line 783 was never true
784 warnings.warn("Since 19.0, use PGAPPNAME instead of ODOO_PGAPPNAME", DeprecationWarning)
785 app_name = os.environ['ODOO_PGAPPNAME']
786 # Using manual string interpolation for security reason and trimming at default NAMEDATALEN=63
787 app_name = app_name.replace('{pid}', str(os.getpid()))[:63]
788 if db_or_uri.startswith(('postgresql://', 'postgres://')): 788 ↛ 790line 788 didn't jump to line 790 because the condition on line 788 was never true
789 # extract db from uri
790 us = urls.url_parse(db_or_uri) # type: ignore
791 if len(us.path) > 1:
792 db_name = us.path[1:]
793 elif us.username:
794 db_name = us.username
795 else:
796 db_name = us.hostname
797 return db_name, {'dsn': db_or_uri, 'application_name': app_name}
799 connection_info = {'database': db_or_uri, 'application_name': app_name}
800 for p in ('host', 'port', 'user', 'password', 'sslmode'):
801 cfg = tools.config['db_' + p]
802 if readonly:
803 cfg = tools.config.get('db_replica_' + p) or cfg
804 if cfg:
805 connection_info[p] = cfg
807 return db_or_uri, connection_info
810_Pool: ConnectionPool | None = None
811_Pool_readonly: ConnectionPool | None = None
814def db_connect(to: str, allow_uri=False, readonly=False) -> Connection:
815 global _Pool, _Pool_readonly # noqa: PLW0603 (global-statement)
817 maxconn = (tools.config['db_maxconn_gevent'] if hasattr(odoo, 'evented') and odoo.evented else 0) or tools.config['db_maxconn']
818 _Pool_readonly if readonly else _Pool
819 if readonly:
820 if _Pool_readonly is None: 820 ↛ 822line 820 didn't jump to line 822 because the condition on line 820 was always true
821 _Pool_readonly = ConnectionPool(int(maxconn), readonly=True)
822 pool = _Pool_readonly
823 else:
824 if _Pool is None:
825 _Pool = ConnectionPool(int(maxconn), readonly=False)
826 pool = _Pool
828 db, info = connection_info_for(to, readonly)
829 if not allow_uri and db != to: 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true
830 raise ValueError('URI connections not allowed')
831 return Connection(pool, db, info)
834def close_db(db_name: str) -> None:
835 """ You might want to call odoo.modules.registry.Registry.delete(db_name) along this function."""
836 if _Pool:
837 _Pool.close_all(connection_info_for(db_name)[1])
838 if _Pool_readonly:
839 _Pool_readonly.close_all(connection_info_for(db_name)[1])
842def close_all() -> None:
843 if _Pool: 843 ↛ 845line 843 didn't jump to line 845 because the condition on line 843 was always true
844 _Pool.close_all()
845 if _Pool_readonly: 845 ↛ exitline 845 didn't return from function 'close_all' because the condition on line 845 was always true
846 _Pool_readonly.close_all()