Coverage for adhoc-cicd-odoo-odoo / odoo / tools / query.py: 85%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1# Part of Odoo. See LICENSE file for full copyright and licensing details. 

2import itertools 

3from collections.abc import Iterable, Iterator 

4 

5from .sql import SQL, make_identifier 

6 

7 

8def _sql_from_table(alias: str, table: SQL) -> SQL: 

9 """ Return a FROM clause element from ``alias`` and ``table``. """ 

10 if (alias_identifier := SQL.identifier(alias)) == table: 

11 return table 

12 return SQL("%s AS %s", table, alias_identifier) 

13 

14 

15def _sql_from_join(kind: SQL, alias: str, table: SQL, condition: SQL) -> SQL: 

16 """ Return a FROM clause element for a JOIN. """ 

17 return SQL("%s %s ON (%s)", kind, _sql_from_table(alias, table), condition) 

18 

19 

20_SQL_JOINS = { 

21 "JOIN": SQL("JOIN"), 

22 "LEFT JOIN": SQL("LEFT JOIN"), 

23} 

24 

25 

26def _generate_table_alias(src_table_alias: str, link: str) -> str: 

27 """ Generate a standard table alias name. An alias is generated as following: 

28 

29 - the base is the source table name (that can already be an alias) 

30 - then, the joined table is added in the alias using a 'link field name' 

31 that is used to render unique aliases for a given path 

32 - the name is shortcut if it goes beyond PostgreSQL's identifier limits 

33 

34 .. code-block:: pycon 

35 

36 >>> _generate_table_alias('res_users', link='parent_id') 

37 'res_users__parent_id' 

38 

39 :param str src_table_alias: alias of the source table 

40 :param str link: field name 

41 :return str: alias 

42 """ 

43 return make_identifier(f"{src_table_alias}__{link}") 

44 

45 

46class Query: 

47 """ Simple implementation of a query object, managing tables with aliases, 

48 join clauses (with aliases, condition and parameters), where clauses (with 

49 parameters), order, limit and offset. 

50 

51 :param env: model environment (for lazy evaluation) 

52 :param alias: name or alias of the table 

53 :param table: a table expression (``str`` or ``SQL`` object), optional 

54 """ 

55 

56 def __init__(self, env, alias: str, table: (SQL | None) = None): 

57 # database cursor 

58 self._env = env 

59 

60 self._tables: dict[str, SQL] = { 

61 alias: table if table is not None else SQL.identifier(alias), 

62 } 

63 

64 # joins {alias: (kind(SQL), table(SQL), condition(SQL))} 

65 self._joins: dict[str, tuple[SQL, SQL, SQL]] = {} 

66 

67 # holds the list of WHERE conditions (to be joined with 'AND') 

68 self._where_clauses: list[SQL] = [] 

69 

70 # groupby, having, order, limit, offset 

71 self.groupby: SQL | None = None 

72 self._order_groupby: list[SQL] = [] 

73 self.having: SQL | None = None 

74 self._order: SQL | None = None 

75 self.limit: int | None = None 

76 self.offset: int | None = None 

77 

78 # memoized result 

79 self._ids: tuple[int, ...] | None = None 

80 

81 @staticmethod 

82 def make_alias(alias: str, link: str) -> str: 

83 """ Return an alias based on ``alias`` and ``link``. """ 

84 return _generate_table_alias(alias, link) 

85 

86 def add_table(self, alias: str, table: (SQL | None) = None): 

87 """ Add a table with a given alias to the from clause. """ 

88 assert alias not in self._tables and alias not in self._joins, f"Alias {alias!r} already in {self}" 

89 self._tables[alias] = table if table is not None else SQL.identifier(alias) 

90 self._ids = self._ids and None 

91 

92 def add_join(self, kind: str, alias: str, table: str | SQL | None, condition: SQL): 

93 """ Add a join clause with the given alias, table and condition. """ 

94 sql_kind = _SQL_JOINS.get(kind.upper()) 

95 assert sql_kind is not None, f"Invalid JOIN type {kind!r}" 

96 assert alias not in self._tables, f"Alias {alias!r} already used" 

97 table = table or alias 

98 if isinstance(table, str): 

99 table = SQL.identifier(table) 

100 

101 if alias in self._joins: 

102 assert self._joins[alias] == (sql_kind, table, condition) 

103 else: 

104 self._joins[alias] = (sql_kind, table, condition) 

105 self._ids = self._ids and None 

106 

107 def add_where(self, where_clause: str | SQL, where_params=()): 

108 """ Add a condition to the where clause. """ 

109 self._where_clauses.append(SQL(where_clause, *where_params)) # pylint: disable = sql-injection 

110 self._ids = self._ids and None 

111 

112 def join(self, lhs_alias: str, lhs_column: str, rhs_table: str | SQL, rhs_column: str, link: str) -> str: 

113 """ 

114 Perform a join between a table already present in the current Query object and 

115 another table. This method is essentially a shortcut for methods :meth:`~.make_alias` 

116 and :meth:`~.add_join`. 

117 

118 :param str lhs_alias: alias of a table already defined in the current Query object. 

119 :param str lhs_column: column of `lhs_alias` to be used for the join's ON condition. 

120 :param str rhs_table: name of the table to join to `lhs_alias`. 

121 :param str rhs_column: column of `rhs_alias` to be used for the join's ON condition. 

122 :param str link: used to generate the alias for the joined table, this string should 

123 represent the relationship (the link) between both tables. 

124 """ 

125 assert lhs_alias in self._tables or lhs_alias in self._joins, "Alias %r not in %s" % (lhs_alias, str(self)) 

126 rhs_alias = self.make_alias(lhs_alias, link) 

127 condition = SQL("%s = %s", SQL.identifier(lhs_alias, lhs_column), SQL.identifier(rhs_alias, rhs_column)) 

128 self.add_join('JOIN', rhs_alias, rhs_table, condition) 

129 return rhs_alias 

130 

131 def left_join(self, lhs_alias: str, lhs_column: str, rhs_table: str, rhs_column: str, link: str) -> str: 

132 """ Add a LEFT JOIN to the current table (if necessary), and return the 

133 alias corresponding to ``rhs_table``. 

134 

135 See the documentation of :meth:`join` for a better overview of the 

136 arguments and what they do. 

137 """ 

138 assert lhs_alias in self._tables or lhs_alias in self._joins, "Alias %r not in %s" % (lhs_alias, str(self)) 

139 rhs_alias = self.make_alias(lhs_alias, link) 

140 condition = SQL("%s = %s", SQL.identifier(lhs_alias, lhs_column), SQL.identifier(rhs_alias, rhs_column)) 

141 self.add_join('LEFT JOIN', rhs_alias, rhs_table, condition) 

142 return rhs_alias 

143 

144 @property 

145 def order(self) -> SQL | None: 

146 return self._order 

147 

148 @order.setter 

149 def order(self, value: SQL | str | None): 

150 self._order = SQL(value) if value is not None else None # pylint: disable = sql-injection 

151 

152 @property 

153 def table(self) -> str: 

154 """ Return the query's main table, i.e., the first one in the FROM clause. """ 

155 return next(iter(self._tables)) 

156 

157 @property 

158 def from_clause(self) -> SQL: 

159 """ Return the FROM clause of ``self``, without the FROM keyword. """ 

160 tables = SQL(", ").join(itertools.starmap(_sql_from_table, self._tables.items())) 

161 if not self._joins: 

162 return tables 

163 items = ( 

164 tables, 

165 *( 

166 _sql_from_join(kind, alias, table, condition) 

167 for alias, (kind, table, condition) in self._joins.items() 

168 ), 

169 ) 

170 return SQL(" ").join(items) 

171 

172 @property 

173 def where_clause(self) -> SQL: 

174 """ Return the WHERE condition of ``self``, without the WHERE keyword. """ 

175 return SQL(" AND ").join(self._where_clauses) 

176 

177 def is_empty(self) -> bool: 

178 """ Return whether the query is known to return nothing. """ 

179 return self._ids == () 

180 

181 def select(self, *args: str | SQL) -> SQL: 

182 """ Return the SELECT query as an ``SQL`` object. """ 

183 sql_args = map(SQL, args) if args else [SQL.identifier(self.table, 'id')] 

184 return SQL( 

185 "%s%s%s%s%s%s%s%s", 

186 SQL("SELECT %s", SQL(", ").join(sql_args)), 

187 SQL(" FROM %s", self.from_clause), 

188 SQL(" WHERE %s", self.where_clause) if self._where_clauses else SQL(), 

189 SQL(" GROUP BY %s", self.groupby) if self.groupby else SQL(), 

190 SQL(" HAVING %s", self.having) if self.having else SQL(), 

191 SQL(" ORDER BY %s", self._order) if self._order else SQL(), 

192 SQL(" LIMIT %s", self.limit) if self.limit else SQL(), 

193 SQL(" OFFSET %s", self.offset) if self.offset else SQL(), 

194 ) 

195 

196 def subselect(self, *args: str | SQL) -> SQL: 

197 """ Similar to :meth:`.select`, but for sub-queries. 

198 This one avoids the ORDER BY clause when possible, 

199 and includes parentheses around the subquery. 

200 """ 

201 if self._ids is not None and not args: 

202 # inject the known result instead of the subquery 

203 if not self._ids: 

204 # in case we have nothing, we want to use a sub_query with no records 

205 # because an empty tuple leads to a syntax error 

206 # and a tuple containing just None creates issues for `NOT IN` 

207 return SQL("(SELECT 1 WHERE FALSE)") 

208 return SQL("%s", self._ids) 

209 

210 if self.limit or self.offset: 210 ↛ 212line 210 didn't jump to line 212 because the condition on line 210 was never true

211 # in this case, the ORDER BY clause is necessary 

212 return SQL("(%s)", self.select(*args)) 

213 

214 sql_args = map(SQL, args) if args else [SQL.identifier(self.table, 'id')] 

215 return SQL( 

216 "(%s%s%s)", 

217 SQL("SELECT %s", SQL(", ").join(sql_args)), 

218 SQL(" FROM %s", self.from_clause), 

219 SQL(" WHERE %s", self.where_clause) if self._where_clauses else SQL(), 

220 ) 

221 

222 def get_result_ids(self) -> tuple[int, ...]: 

223 """ Return the result of ``self.select()`` as a tuple of ids. The result 

224 is memoized for future use, which avoids making the same query twice. 

225 """ 

226 if self._ids is None: 

227 self._ids = tuple(id_ for id_, in self._env.execute_query(self.select())) 

228 return self._ids 

229 

230 def set_result_ids(self, ids: Iterable[int], ordered: bool = True) -> None: 

231 """ Set up the query to return the lines given by ``ids``. The parameter 

232 ``ordered`` tells whether the query must be ordered to match exactly the 

233 sequence ``ids``. 

234 """ 

235 assert not (self._joins or self._where_clauses or self.limit or self.offset), \ 

236 "Method set_result_ids() can only be called on a virgin Query" 

237 ids = tuple(ids) 

238 if not ids: 

239 self.add_where("FALSE") 

240 elif ordered: 240 ↛ 248line 240 didn't jump to line 248 because the condition on line 240 was never true

241 # This guarantees that self.select() returns the results in the 

242 # expected order of ids: 

243 # SELECT "stuff".id 

244 # FROM "stuff" 

245 # JOIN (SELECT * FROM unnest(%s) WITH ORDINALITY) AS "stuff__ids" 

246 # ON ("stuff"."id" = "stuff__ids"."unnest") 

247 # ORDER BY "stuff__ids"."ordinality" 

248 alias = self.join( 

249 self.table, 'id', 

250 SQL('(SELECT * FROM unnest(%s) WITH ORDINALITY)', list(ids)), 'unnest', 

251 'ids', 

252 ) 

253 self.order = SQL.identifier(alias, 'ordinality') 

254 else: 

255 self.add_where(SQL("%s IN %s", SQL.identifier(self.table, 'id'), ids)) 

256 self._ids = ids 

257 

258 def __str__(self) -> str: 

259 sql = self.select() 

260 return f"<Query: {sql.code!r} with params: {sql.params!r}>" 

261 

262 def __bool__(self): 

263 return bool(self.get_result_ids()) 

264 

265 def __len__(self) -> int: 

266 if self._ids is None: 

267 if self.limit or self.offset: 

268 # optimization: generate a SELECT FROM, and then count the rows 

269 sql = SQL("SELECT COUNT(*) FROM (%s) t", self.select("")) 

270 else: 

271 sql = self.select('COUNT(*)') 

272 return self._env.execute_query(sql)[0][0] 

273 return len(self.get_result_ids()) 

274 

275 def __iter__(self) -> Iterator[int]: 

276 return iter(self.get_result_ids())