Coverage for adhoc-cicd-odoo-odoo / odoo / tools / query.py: 85%
119 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1# Part of Odoo. See LICENSE file for full copyright and licensing details.
2import itertools
3from collections.abc import Iterable, Iterator
5from .sql import SQL, make_identifier
8def _sql_from_table(alias: str, table: SQL) -> SQL:
9 """ Return a FROM clause element from ``alias`` and ``table``. """
10 if (alias_identifier := SQL.identifier(alias)) == table:
11 return table
12 return SQL("%s AS %s", table, alias_identifier)
15def _sql_from_join(kind: SQL, alias: str, table: SQL, condition: SQL) -> SQL:
16 """ Return a FROM clause element for a JOIN. """
17 return SQL("%s %s ON (%s)", kind, _sql_from_table(alias, table), condition)
20_SQL_JOINS = {
21 "JOIN": SQL("JOIN"),
22 "LEFT JOIN": SQL("LEFT JOIN"),
23}
26def _generate_table_alias(src_table_alias: str, link: str) -> str:
27 """ Generate a standard table alias name. An alias is generated as following:
29 - the base is the source table name (that can already be an alias)
30 - then, the joined table is added in the alias using a 'link field name'
31 that is used to render unique aliases for a given path
32 - the name is shortcut if it goes beyond PostgreSQL's identifier limits
34 .. code-block:: pycon
36 >>> _generate_table_alias('res_users', link='parent_id')
37 'res_users__parent_id'
39 :param str src_table_alias: alias of the source table
40 :param str link: field name
41 :return str: alias
42 """
43 return make_identifier(f"{src_table_alias}__{link}")
46class Query:
47 """ Simple implementation of a query object, managing tables with aliases,
48 join clauses (with aliases, condition and parameters), where clauses (with
49 parameters), order, limit and offset.
51 :param env: model environment (for lazy evaluation)
52 :param alias: name or alias of the table
53 :param table: a table expression (``str`` or ``SQL`` object), optional
54 """
56 def __init__(self, env, alias: str, table: (SQL | None) = None):
57 # database cursor
58 self._env = env
60 self._tables: dict[str, SQL] = {
61 alias: table if table is not None else SQL.identifier(alias),
62 }
64 # joins {alias: (kind(SQL), table(SQL), condition(SQL))}
65 self._joins: dict[str, tuple[SQL, SQL, SQL]] = {}
67 # holds the list of WHERE conditions (to be joined with 'AND')
68 self._where_clauses: list[SQL] = []
70 # groupby, having, order, limit, offset
71 self.groupby: SQL | None = None
72 self._order_groupby: list[SQL] = []
73 self.having: SQL | None = None
74 self._order: SQL | None = None
75 self.limit: int | None = None
76 self.offset: int | None = None
78 # memoized result
79 self._ids: tuple[int, ...] | None = None
81 @staticmethod
82 def make_alias(alias: str, link: str) -> str:
83 """ Return an alias based on ``alias`` and ``link``. """
84 return _generate_table_alias(alias, link)
86 def add_table(self, alias: str, table: (SQL | None) = None):
87 """ Add a table with a given alias to the from clause. """
88 assert alias not in self._tables and alias not in self._joins, f"Alias {alias!r} already in {self}"
89 self._tables[alias] = table if table is not None else SQL.identifier(alias)
90 self._ids = self._ids and None
92 def add_join(self, kind: str, alias: str, table: str | SQL | None, condition: SQL):
93 """ Add a join clause with the given alias, table and condition. """
94 sql_kind = _SQL_JOINS.get(kind.upper())
95 assert sql_kind is not None, f"Invalid JOIN type {kind!r}"
96 assert alias not in self._tables, f"Alias {alias!r} already used"
97 table = table or alias
98 if isinstance(table, str):
99 table = SQL.identifier(table)
101 if alias in self._joins:
102 assert self._joins[alias] == (sql_kind, table, condition)
103 else:
104 self._joins[alias] = (sql_kind, table, condition)
105 self._ids = self._ids and None
107 def add_where(self, where_clause: str | SQL, where_params=()):
108 """ Add a condition to the where clause. """
109 self._where_clauses.append(SQL(where_clause, *where_params)) # pylint: disable = sql-injection
110 self._ids = self._ids and None
112 def join(self, lhs_alias: str, lhs_column: str, rhs_table: str | SQL, rhs_column: str, link: str) -> str:
113 """
114 Perform a join between a table already present in the current Query object and
115 another table. This method is essentially a shortcut for methods :meth:`~.make_alias`
116 and :meth:`~.add_join`.
118 :param str lhs_alias: alias of a table already defined in the current Query object.
119 :param str lhs_column: column of `lhs_alias` to be used for the join's ON condition.
120 :param str rhs_table: name of the table to join to `lhs_alias`.
121 :param str rhs_column: column of `rhs_alias` to be used for the join's ON condition.
122 :param str link: used to generate the alias for the joined table, this string should
123 represent the relationship (the link) between both tables.
124 """
125 assert lhs_alias in self._tables or lhs_alias in self._joins, "Alias %r not in %s" % (lhs_alias, str(self))
126 rhs_alias = self.make_alias(lhs_alias, link)
127 condition = SQL("%s = %s", SQL.identifier(lhs_alias, lhs_column), SQL.identifier(rhs_alias, rhs_column))
128 self.add_join('JOIN', rhs_alias, rhs_table, condition)
129 return rhs_alias
131 def left_join(self, lhs_alias: str, lhs_column: str, rhs_table: str, rhs_column: str, link: str) -> str:
132 """ Add a LEFT JOIN to the current table (if necessary), and return the
133 alias corresponding to ``rhs_table``.
135 See the documentation of :meth:`join` for a better overview of the
136 arguments and what they do.
137 """
138 assert lhs_alias in self._tables or lhs_alias in self._joins, "Alias %r not in %s" % (lhs_alias, str(self))
139 rhs_alias = self.make_alias(lhs_alias, link)
140 condition = SQL("%s = %s", SQL.identifier(lhs_alias, lhs_column), SQL.identifier(rhs_alias, rhs_column))
141 self.add_join('LEFT JOIN', rhs_alias, rhs_table, condition)
142 return rhs_alias
144 @property
145 def order(self) -> SQL | None:
146 return self._order
148 @order.setter
149 def order(self, value: SQL | str | None):
150 self._order = SQL(value) if value is not None else None # pylint: disable = sql-injection
152 @property
153 def table(self) -> str:
154 """ Return the query's main table, i.e., the first one in the FROM clause. """
155 return next(iter(self._tables))
157 @property
158 def from_clause(self) -> SQL:
159 """ Return the FROM clause of ``self``, without the FROM keyword. """
160 tables = SQL(", ").join(itertools.starmap(_sql_from_table, self._tables.items()))
161 if not self._joins:
162 return tables
163 items = (
164 tables,
165 *(
166 _sql_from_join(kind, alias, table, condition)
167 for alias, (kind, table, condition) in self._joins.items()
168 ),
169 )
170 return SQL(" ").join(items)
172 @property
173 def where_clause(self) -> SQL:
174 """ Return the WHERE condition of ``self``, without the WHERE keyword. """
175 return SQL(" AND ").join(self._where_clauses)
177 def is_empty(self) -> bool:
178 """ Return whether the query is known to return nothing. """
179 return self._ids == ()
181 def select(self, *args: str | SQL) -> SQL:
182 """ Return the SELECT query as an ``SQL`` object. """
183 sql_args = map(SQL, args) if args else [SQL.identifier(self.table, 'id')]
184 return SQL(
185 "%s%s%s%s%s%s%s%s",
186 SQL("SELECT %s", SQL(", ").join(sql_args)),
187 SQL(" FROM %s", self.from_clause),
188 SQL(" WHERE %s", self.where_clause) if self._where_clauses else SQL(),
189 SQL(" GROUP BY %s", self.groupby) if self.groupby else SQL(),
190 SQL(" HAVING %s", self.having) if self.having else SQL(),
191 SQL(" ORDER BY %s", self._order) if self._order else SQL(),
192 SQL(" LIMIT %s", self.limit) if self.limit else SQL(),
193 SQL(" OFFSET %s", self.offset) if self.offset else SQL(),
194 )
196 def subselect(self, *args: str | SQL) -> SQL:
197 """ Similar to :meth:`.select`, but for sub-queries.
198 This one avoids the ORDER BY clause when possible,
199 and includes parentheses around the subquery.
200 """
201 if self._ids is not None and not args:
202 # inject the known result instead of the subquery
203 if not self._ids:
204 # in case we have nothing, we want to use a sub_query with no records
205 # because an empty tuple leads to a syntax error
206 # and a tuple containing just None creates issues for `NOT IN`
207 return SQL("(SELECT 1 WHERE FALSE)")
208 return SQL("%s", self._ids)
210 if self.limit or self.offset: 210 ↛ 212line 210 didn't jump to line 212 because the condition on line 210 was never true
211 # in this case, the ORDER BY clause is necessary
212 return SQL("(%s)", self.select(*args))
214 sql_args = map(SQL, args) if args else [SQL.identifier(self.table, 'id')]
215 return SQL(
216 "(%s%s%s)",
217 SQL("SELECT %s", SQL(", ").join(sql_args)),
218 SQL(" FROM %s", self.from_clause),
219 SQL(" WHERE %s", self.where_clause) if self._where_clauses else SQL(),
220 )
222 def get_result_ids(self) -> tuple[int, ...]:
223 """ Return the result of ``self.select()`` as a tuple of ids. The result
224 is memoized for future use, which avoids making the same query twice.
225 """
226 if self._ids is None:
227 self._ids = tuple(id_ for id_, in self._env.execute_query(self.select()))
228 return self._ids
230 def set_result_ids(self, ids: Iterable[int], ordered: bool = True) -> None:
231 """ Set up the query to return the lines given by ``ids``. The parameter
232 ``ordered`` tells whether the query must be ordered to match exactly the
233 sequence ``ids``.
234 """
235 assert not (self._joins or self._where_clauses or self.limit or self.offset), \
236 "Method set_result_ids() can only be called on a virgin Query"
237 ids = tuple(ids)
238 if not ids:
239 self.add_where("FALSE")
240 elif ordered: 240 ↛ 248line 240 didn't jump to line 248 because the condition on line 240 was never true
241 # This guarantees that self.select() returns the results in the
242 # expected order of ids:
243 # SELECT "stuff".id
244 # FROM "stuff"
245 # JOIN (SELECT * FROM unnest(%s) WITH ORDINALITY) AS "stuff__ids"
246 # ON ("stuff"."id" = "stuff__ids"."unnest")
247 # ORDER BY "stuff__ids"."ordinality"
248 alias = self.join(
249 self.table, 'id',
250 SQL('(SELECT * FROM unnest(%s) WITH ORDINALITY)', list(ids)), 'unnest',
251 'ids',
252 )
253 self.order = SQL.identifier(alias, 'ordinality')
254 else:
255 self.add_where(SQL("%s IN %s", SQL.identifier(self.table, 'id'), ids))
256 self._ids = ids
258 def __str__(self) -> str:
259 sql = self.select()
260 return f"<Query: {sql.code!r} with params: {sql.params!r}>"
262 def __bool__(self):
263 return bool(self.get_result_ids())
265 def __len__(self) -> int:
266 if self._ids is None:
267 if self.limit or self.offset:
268 # optimization: generate a SELECT FROM, and then count the rows
269 sql = SQL("SELECT COUNT(*) FROM (%s) t", self.select(""))
270 else:
271 sql = self.select('COUNT(*)')
272 return self._env.execute_query(sql)[0][0]
273 return len(self.get_result_ids())
275 def __iter__(self) -> Iterator[int]:
276 return iter(self.get_result_ids())