Coverage for adhoc-cicd-odoo-odoo / odoo / tools / sql.py: 73%

289 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:15 +0000

1# Part of Odoo. See LICENSE file for full copyright and licensing details. 

2# pylint: disable=sql-injection 

3from __future__ import annotations 

4 

5import enum 

6import json 

7import logging 

8import re 

9import warnings 

10from binascii import crc32 

11from collections import defaultdict 

12from typing import TYPE_CHECKING 

13 

14if TYPE_CHECKING: 

15 from odoo.fields import Field 

16 from collections.abc import Iterable 

17 

18import psycopg2 

19 

20from .misc import named_to_positional_printf 

21 

22__all__ = [ 

23 "SQL", 

24 "create_index", 

25 "drop_view_if_exists", 

26 "escape_psql", 

27 "index_exists", 

28 "make_identifier", 

29 "make_index_name", 

30 "reverse_order", 

31] 

32 

33_schema = logging.getLogger('odoo.schema') 

34 

35IDENT_RE = re.compile(r'^[a-z0-9_][a-z0-9_$\-]*$', re.I) 

36 

37_CONFDELTYPES = { 

38 'RESTRICT': 'r', 

39 'NO ACTION': 'a', 

40 'CASCADE': 'c', 

41 'SET NULL': 'n', 

42 'SET DEFAULT': 'd', 

43} 

44 

45 

46class SQL: 

47 """ An object that wraps SQL code with its parameters, like:: 

48 

49 sql = SQL("UPDATE TABLE foo SET a = %s, b = %s", 'hello', 42) 

50 cr.execute(sql) 

51 

52 The code is given as a ``%``-format string, and supports either positional 

53 arguments (with `%s`) or named arguments (with `%(name)s`). The arguments 

54 are meant to be merged into the code using the `%` formatting operator. 

55 Note that the character ``%`` must always be escaped (as ``%%``), even if 

56 the code does not have parameters, like in ``SQL("foo LIKE 'a%%'")``. 

57 

58 The SQL wrapper is designed to be composable: the arguments can be either 

59 actual parameters, or SQL objects themselves:: 

60 

61 sql = SQL( 

62 "UPDATE TABLE %s SET %s", 

63 SQL.identifier(tablename), 

64 SQL("%s = %s", SQL.identifier(columnname), value), 

65 ) 

66 

67 The combined SQL code is given by ``sql.code``, while the corresponding 

68 combined parameters are given by the list ``sql.params``. This allows to 

69 combine any number of SQL terms without having to separately combine their 

70 parameters, which can be tedious, bug-prone, and is the main downside of 

71 `psycopg2.sql <https://www.psycopg.org/docs/sql.html>`. 

72 

73 The second purpose of the wrapper is to discourage SQL injections. Indeed, 

74 if ``code`` is a string literal (not a dynamic string), then the SQL object 

75 made with ``code`` is guaranteed to be safe, provided the SQL objects 

76 within its parameters are themselves safe. 

77 

78 The wrapper may also contain some metadata ``to_flush``. If not ``None``, 

79 its value is a field which the SQL code depends on. The metadata of a 

80 wrapper and its parts can be accessed by the iterator ``sql.to_flush``. 

81 """ 

82 __slots__ = ('__code', '__params', '__to_flush') 

83 

84 __code: str 

85 __params: tuple 

86 __to_flush: tuple[Field, ...] 

87 

88 # pylint: disable=keyword-arg-before-vararg 

89 def __init__(self, code: (str | SQL) = "", /, *args, to_flush: (Field | Iterable[Field] | None) = None, **kwargs): 

90 if isinstance(code, SQL): 

91 if args or kwargs or to_flush: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 raise TypeError("SQL() unexpected arguments when code has type SQL") 

93 self.__code = code.__code 

94 self.__params = code.__params 

95 self.__to_flush = code.__to_flush 

96 return 

97 

98 # validate the format of code and parameters 

99 if args and kwargs: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 raise TypeError("SQL() takes either positional arguments, or named arguments") 

101 

102 if kwargs: 

103 code, args = named_to_positional_printf(code, kwargs) 

104 elif not args: 

105 code % () # check that code does not contain %s 

106 self.__code = code 

107 self.__params = () 

108 if to_flush is None: 

109 self.__to_flush = () 

110 elif hasattr(to_flush, '__iter__'): 

111 self.__to_flush = tuple(to_flush) 

112 else: 

113 self.__to_flush = (to_flush,) 

114 return 

115 

116 code_list = [] 

117 params_list = [] 

118 to_flush_list = [] 

119 for arg in args: 

120 if isinstance(arg, SQL): 

121 code_list.append(arg.__code) 

122 params_list.extend(arg.__params) 

123 to_flush_list.extend(arg.__to_flush) 

124 else: 

125 code_list.append("%s") 

126 params_list.append(arg) 

127 if to_flush is not None: 

128 if hasattr(to_flush, '__iter__'): 

129 to_flush_list.extend(to_flush) 

130 else: 

131 to_flush_list.append(to_flush) 

132 

133 self.__code = code.replace('%%', '%%%%') % tuple(code_list) 

134 self.__params = tuple(params_list) 

135 self.__to_flush = tuple(to_flush_list) 

136 

137 @property 

138 def code(self) -> str: 

139 """ Return the combined SQL code string. """ 

140 return self.__code 

141 

142 @property 

143 def params(self) -> list: 

144 """ Return the combined SQL code params as a list of values. """ 

145 return list(self.__params) 

146 

147 @property 

148 def to_flush(self) -> Iterable[Field]: 

149 """ Return an iterator on the fields to flush in the metadata of 

150 ``self`` and all of its parts. 

151 """ 

152 return self.__to_flush 

153 

154 def __repr__(self): 

155 return f"SQL({', '.join(map(repr, [self.__code, *self.__params]))})" 

156 

157 def __bool__(self): 

158 return bool(self.__code) 

159 

160 def __eq__(self, other): 

161 return isinstance(other, SQL) and self.__code == other.__code and self.__params == other.__params 

162 

163 def __hash__(self): 

164 return hash((self.__code, self.__params)) 

165 

166 def __iter__(self): 

167 """ Yields ``self.code`` and ``self.params``. This was introduced for 

168 backward compatibility, as it enables to access the SQL and parameters 

169 by deconstructing the object:: 

170 

171 sql = SQL(...) 

172 code, params = sql 

173 """ 

174 warnings.warn("Deprecated since 19.0, use code and params properties directly", DeprecationWarning) 

175 yield self.code 

176 yield self.params 

177 

178 def join(self, args: Iterable) -> SQL: 

179 """ Join SQL objects or parameters with ``self`` as a separator. """ 

180 args = list(args) 

181 # optimizations for special cases 

182 if len(args) == 0: 

183 return SQL() 

184 if len(args) == 1 and isinstance(args[0], SQL): 

185 return args[0] 

186 if not self.__params: 186 ↛ 189line 186 didn't jump to line 189 because the condition on line 186 was always true

187 return SQL(self.__code.join("%s" for arg in args), *args) 

188 # general case: alternate args with self 

189 items = [self] * (len(args) * 2 - 1) 

190 for index, arg in enumerate(args): 

191 items[index * 2] = arg 

192 return SQL("%s" * len(items), *items) 

193 

194 @classmethod 

195 def identifier(cls, name: str, subname: (str | None) = None, to_flush: (Field | None) = None) -> SQL: 

196 """ Return an SQL object that represents an identifier. """ 

197 assert name.isidentifier() or IDENT_RE.match(name), f"{name!r} invalid for SQL.identifier()" 

198 if subname is None: 

199 return cls(f'"{name}"', to_flush=to_flush) 

200 assert subname.isidentifier() or IDENT_RE.match(subname), f"{subname!r} invalid for SQL.identifier()" 

201 return cls(f'"{name}"."{subname}"', to_flush=to_flush) 

202 

203 

204def existing_tables(cr, tablenames): 

205 """ Return the names of existing tables among ``tablenames``. """ 

206 cr.execute(SQL(""" 

207 SELECT c.relname 

208 FROM pg_class c 

209 WHERE c.relname IN %s 

210 AND c.relkind IN ('r', 'v', 'm') 

211 AND c.relnamespace = current_schema::regnamespace 

212 """, tuple(tablenames))) 

213 return [row[0] for row in cr.fetchall()] 

214 

215 

216def table_exists(cr, tablename): 

217 """ Return whether the given table exists. """ 

218 return len(existing_tables(cr, {tablename})) == 1 

219 

220 

221class TableKind(enum.Enum): 

222 Regular = 'r' 

223 Temporary = 't' 

224 View = 'v' 

225 Materialized = 'm' 

226 Foreign = 'f' 

227 Other = None 

228 

229 

230def table_kind(cr, tablename: str) -> TableKind | None: 

231 """ Return the kind of a table, if ``tablename`` is a regular or foreign 

232 table, or a view (ignores indexes, sequences, toast tables, and partitioned 

233 tables; unlogged tables are considered regular) 

234 """ 

235 cr.execute(SQL(""" 

236 SELECT c.relkind, c.relpersistence 

237 FROM pg_class c 

238 WHERE c.relname = %s 

239 AND c.relnamespace = current_schema::regnamespace 

240 """, tablename)) 

241 if not cr.rowcount: 

242 return None 

243 

244 kind, persistence = cr.fetchone() 

245 # special case: permanent, temporary, and unlogged tables differ by their 

246 # relpersistence, they're all "ordinary" (relkind = r) 

247 if kind == 'r': 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 return TableKind.Temporary if persistence == 't' else TableKind.Regular 

249 

250 try: 

251 return TableKind(kind) 

252 except ValueError: 

253 # NB: or raise? unclear if it makes sense to allow table_kind to 

254 # "work" with something like an index or sequence 

255 return TableKind.Other 

256 

257 

258# prescribed column order by type: columns aligned on 4 bytes, columns aligned 

259# on 1 byte, columns aligned on 8 bytes(values have been chosen to minimize 

260# padding in rows; unknown column types are put last) 

261SQL_ORDER_BY_TYPE = defaultdict(lambda: 16, { 

262 'int4': 1, # 4 bytes aligned on 4 bytes 

263 'varchar': 2, # variable aligned on 4 bytes 

264 'date': 3, # 4 bytes aligned on 4 bytes 

265 'jsonb': 4, # jsonb 

266 'text': 5, # variable aligned on 4 bytes 

267 'numeric': 6, # variable aligned on 4 bytes 

268 'bool': 7, # 1 byte aligned on 1 byte 

269 'timestamp': 8, # 8 bytes aligned on 8 bytes 

270 'float8': 9, # 8 bytes aligned on 8 bytes 

271}) 

272 

273 

274def create_model_table(cr, tablename, comment=None, columns=()): 

275 """ Create the table for a model. """ 

276 colspecs = [ 

277 SQL('id SERIAL NOT NULL'), 

278 *(SQL("%s %s", SQL.identifier(colname), SQL(coltype)) for colname, coltype, _ in columns), 

279 SQL('PRIMARY KEY(id)'), 

280 ] 

281 queries = [ 

282 SQL("CREATE TABLE %s (%s)", SQL.identifier(tablename), SQL(", ").join(colspecs)), 

283 ] 

284 if comment: 284 ↛ 289line 284 didn't jump to line 289 because the condition on line 284 was always true

285 queries.append(SQL( 

286 "COMMENT ON TABLE %s IS %s", 

287 SQL.identifier(tablename), comment, 

288 )) 

289 for colname, _, colcomment in columns: 

290 queries.append(SQL( 

291 "COMMENT ON COLUMN %s IS %s", 

292 SQL.identifier(tablename, colname), colcomment, 

293 )) 

294 cr.execute(SQL("; ").join(queries)) 

295 

296 _schema.debug("Table %r: created", tablename) 

297 

298 

299def table_columns(cr, tablename): 

300 """ Return a dict mapping column names to their configuration. The latter is 

301 a dict with the data from the table ``information_schema.columns``. 

302 """ 

303 # Do not select the field `character_octet_length` from `information_schema.columns` 

304 # because specific access right restriction in the context of shared hosting (Heroku, OVH, ...) 

305 # might prevent a postgres user to read this field. 

306 cr.execute(SQL( 

307 ''' SELECT column_name, udt_name, character_maximum_length, is_nullable 

308 FROM information_schema.columns WHERE table_name=%s 

309 AND table_schema = current_schema ''', 

310 tablename, 

311 )) 

312 return {row['column_name']: row for row in cr.dictfetchall()} 

313 

314 

315def column_exists(cr, tablename, columnname): 

316 """ Return whether the given column exists. """ 

317 cr.execute(SQL( 

318 """ SELECT 1 FROM information_schema.columns 

319 WHERE table_name=%s AND column_name=%s 

320 AND table_schema = current_schema """, 

321 tablename, columnname, 

322 )) 

323 return cr.rowcount 

324 

325 

326def create_column(cr, tablename, columnname, columntype, comment=None): 

327 """ Create a column with the given type. """ 

328 sql = SQL( 

329 "ALTER TABLE %s ADD COLUMN %s %s %s", 

330 SQL.identifier(tablename), 

331 SQL.identifier(columnname), 

332 SQL(columntype), 

333 SQL("DEFAULT false" if columntype.upper() == 'BOOLEAN' else ""), 

334 ) 

335 if comment: 

336 sql = SQL("%s; %s", sql, SQL( 

337 "COMMENT ON COLUMN %s IS %s", 

338 SQL.identifier(tablename, columnname), comment, 

339 )) 

340 cr.execute(sql) 

341 _schema.debug("Table %r: added column %r of type %s", tablename, columnname, columntype) 

342 

343 

344def rename_column(cr, tablename, columnname1, columnname2): 

345 """ Rename the given column. """ 

346 cr.execute(SQL( 

347 "ALTER TABLE %s RENAME COLUMN %s TO %s", 

348 SQL.identifier(tablename), 

349 SQL.identifier(columnname1), 

350 SQL.identifier(columnname2), 

351 )) 

352 _schema.debug("Table %r: renamed column %r to %r", tablename, columnname1, columnname2) 

353 

354 

355def convert_column(cr, tablename, columnname, columntype): 

356 """ Convert the column to the given type. """ 

357 using = SQL("%s::%s", SQL.identifier(columnname), SQL(columntype)) 

358 _convert_column(cr, tablename, columnname, columntype, using) 

359 

360 

361def convert_column_translatable(cr, tablename, columnname, columntype): 

362 """ Convert the column from/to a 'jsonb' translated field column. """ 

363 drop_index(cr, make_index_name(tablename, columnname), tablename) 

364 if columntype == "jsonb": 

365 using = SQL( 

366 "CASE WHEN %s IS NOT NULL THEN jsonb_build_object('en_US', %s::varchar) END", 

367 SQL.identifier(columnname), SQL.identifier(columnname), 

368 ) 

369 else: 

370 using = SQL("%s->>'en_US'", SQL.identifier(columnname)) 

371 _convert_column(cr, tablename, columnname, columntype, using) 

372 

373 

374def _convert_column(cr, tablename, columnname, columntype, using: SQL): 

375 query = SQL( 

376 "ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT, ALTER COLUMN %s TYPE %s USING %s", 

377 SQL.identifier(tablename), SQL.identifier(columnname), 

378 SQL.identifier(columnname), SQL(columntype), using, 

379 ) 

380 try: 

381 with cr.savepoint(flush=False): 

382 cr.execute(query, log_exceptions=False) 

383 except psycopg2.NotSupportedError: 

384 drop_depending_views(cr, tablename, columnname) 

385 cr.execute(query) 

386 _schema.debug("Table %r: column %r changed to type %s", tablename, columnname, columntype) 

387 

388 

389def drop_depending_views(cr, table, column): 

390 """drop views depending on a field to allow the ORM to resize it in-place""" 

391 for v, k in get_depending_views(cr, table, column): 

392 cr.execute(SQL( 

393 "DROP %s IF EXISTS %s CASCADE", 

394 SQL("MATERIALIZED VIEW" if k == "m" else "VIEW"), 

395 SQL.identifier(v), 

396 )) 

397 _schema.debug("Drop view %r", v) 

398 

399 

400def get_depending_views(cr, table, column): 

401 # http://stackoverflow.com/a/11773226/75349 

402 cr.execute(SQL(""" 

403 SELECT distinct quote_ident(dependee.relname), dependee.relkind 

404 FROM pg_depend 

405 JOIN pg_rewrite ON pg_depend.objid = pg_rewrite.oid 

406 JOIN pg_class as dependee ON pg_rewrite.ev_class = dependee.oid 

407 JOIN pg_class as dependent ON pg_depend.refobjid = dependent.oid 

408 JOIN pg_attribute ON pg_depend.refobjid = pg_attribute.attrelid 

409 AND pg_depend.refobjsubid = pg_attribute.attnum 

410 WHERE dependent.relname = %s 

411 AND dependent.relnamespace = current_schema::regnamespace 

412 AND pg_attribute.attnum > 0 

413 AND pg_attribute.attname = %s 

414 AND dependee.relkind in ('v', 'm') 

415 """, table, column)) 

416 return cr.fetchall() 

417 

418 

419def set_not_null(cr, tablename, columnname): 

420 """ Add a NOT NULL constraint on the given column. """ 

421 query = SQL( 

422 "ALTER TABLE %s ALTER COLUMN %s SET NOT NULL", 

423 SQL.identifier(tablename), SQL.identifier(columnname), 

424 ) 

425 cr.execute(query, log_exceptions=False) 

426 _schema.debug("Table %r: column %r: added constraint NOT NULL", tablename, columnname) 

427 

428 

429def drop_not_null(cr, tablename, columnname): 

430 """ Drop the NOT NULL constraint on the given column. """ 

431 cr.execute(SQL( 

432 "ALTER TABLE %s ALTER COLUMN %s DROP NOT NULL", 

433 SQL.identifier(tablename), SQL.identifier(columnname), 

434 )) 

435 _schema.debug("Table %r: column %r: dropped constraint NOT NULL", tablename, columnname) 

436 

437 

438def constraint_definition(cr, tablename, constraintname): 

439 """ Return the given constraint's definition. """ 

440 cr.execute(SQL(""" 

441 SELECT COALESCE(d.description, pg_get_constraintdef(c.oid)) 

442 FROM pg_constraint c 

443 JOIN pg_class t ON t.oid = c.conrelid 

444 LEFT JOIN pg_description d ON c.oid = d.objoid 

445 WHERE t.relname = %s AND conname = %s 

446 AND t.relnamespace = current_schema::regnamespace 

447 """, tablename, constraintname)) 

448 return cr.fetchone()[0] if cr.rowcount else None 

449 

450 

451def add_constraint(cr, tablename, constraintname, definition): 

452 """ Add a constraint on the given table. """ 

453 query1 = SQL( 

454 "ALTER TABLE %s ADD CONSTRAINT %s %s", 

455 SQL.identifier(tablename), SQL.identifier(constraintname), SQL(definition.replace('%', '%%')), 

456 ) 

457 query2 = SQL( 

458 "COMMENT ON CONSTRAINT %s ON %s IS %s", 

459 SQL.identifier(constraintname), SQL.identifier(tablename), definition, 

460 ) 

461 cr.execute(query1, log_exceptions=False) 

462 cr.execute(query2, log_exceptions=False) 

463 _schema.debug("Table %r: added constraint %r as %s", tablename, constraintname, definition) 

464 

465 

466def drop_constraint(cr, tablename, constraintname): 

467 """ Drop the given constraint. """ 

468 cr.execute(SQL( 

469 "ALTER TABLE %s DROP CONSTRAINT %s", 

470 SQL.identifier(tablename), SQL.identifier(constraintname), 

471 )) 

472 _schema.debug("Table %r: dropped constraint %r", tablename, constraintname) 

473 

474 

475def add_foreign_key(cr, tablename1, columnname1, tablename2, columnname2, ondelete): 

476 """ Create the given foreign key, and return ``True``. """ 

477 cr.execute(SQL( 

478 "ALTER TABLE %s ADD FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE %s", 

479 SQL.identifier(tablename1), SQL.identifier(columnname1), 

480 SQL.identifier(tablename2), SQL.identifier(columnname2), 

481 SQL(ondelete), 

482 )) 

483 _schema.debug("Table %r: added foreign key %r references %r(%r) ON DELETE %s", 

484 tablename1, columnname1, tablename2, columnname2, ondelete) 

485 

486 

487def get_foreign_keys(cr, tablename1, columnname1, tablename2, columnname2, ondelete): 

488 deltype = _CONFDELTYPES[ondelete.upper()] 

489 cr.execute(SQL( 

490 """ 

491 SELECT fk.conname as name 

492 FROM pg_constraint AS fk 

493 JOIN pg_class AS c1 ON fk.conrelid = c1.oid 

494 JOIN pg_class AS c2 ON fk.confrelid = c2.oid 

495 JOIN pg_attribute AS a1 ON a1.attrelid = c1.oid AND fk.conkey[1] = a1.attnum 

496 JOIN pg_attribute AS a2 ON a2.attrelid = c2.oid AND fk.confkey[1] = a2.attnum 

497 WHERE fk.contype = 'f' 

498 AND c1.relname = %s 

499 AND a1.attname = %s 

500 AND c2.relname = %s 

501 AND a2.attname = %s 

502 AND c1.relnamespace = current_schema::regnamespace 

503 AND fk.confdeltype = %s 

504 """, 

505 tablename1, columnname1, tablename2, columnname2, deltype, 

506 )) 

507 return [r[0] for r in cr.fetchall()] 

508 

509 

510def fix_foreign_key(cr, tablename1, columnname1, tablename2, columnname2, ondelete): 

511 """ Update the foreign keys between tables to match the given one, and 

512 return ``True`` if the given foreign key has been recreated. 

513 """ 

514 # Do not use 'information_schema' here, as those views are awfully slow! 

515 deltype = _CONFDELTYPES.get(ondelete.upper(), 'a') 

516 cr.execute(SQL( 

517 """ SELECT con.conname, c2.relname, a2.attname, con.confdeltype as deltype 

518 FROM pg_constraint as con, pg_class as c1, pg_class as c2, 

519 pg_attribute as a1, pg_attribute as a2 

520 WHERE con.contype='f' AND con.conrelid=c1.oid AND con.confrelid=c2.oid 

521 AND array_lower(con.conkey, 1)=1 AND con.conkey[1]=a1.attnum 

522 AND array_lower(con.confkey, 1)=1 AND con.confkey[1]=a2.attnum 

523 AND a1.attrelid=c1.oid AND a2.attrelid=c2.oid 

524 AND c1.relname=%s AND a1.attname=%s 

525 AND c1.relnamespace = current_schema::regnamespace """, 

526 tablename1, columnname1, 

527 )) 

528 found = False 

529 for fk in cr.fetchall(): 

530 if not found and fk[1:] == (tablename2, columnname2, deltype): 

531 found = True 

532 else: 

533 drop_constraint(cr, tablename1, fk[0]) 

534 if found: 

535 return False 

536 add_foreign_key(cr, tablename1, columnname1, tablename2, columnname2, ondelete) 

537 return True 

538 

539 

540def index_exists(cr, indexname): 

541 """ Return whether the given index exists. """ 

542 cr.execute(SQL("SELECT 1 FROM pg_indexes WHERE indexname=%s" 

543 " AND schemaname = current_schema", indexname)) 

544 return cr.rowcount 

545 

546 

547def check_index_exist(cr, indexname): 

548 assert index_exists(cr, indexname), f"{indexname} does not exist" 

549 

550 

551def index_definition(cr, indexname): 

552 """ Read the index definition from the database """ 

553 cr.execute(SQL(""" 

554 SELECT idx.indexdef, d.description 

555 FROM pg_class c 

556 JOIN pg_indexes idx ON c.relname = idx.indexname 

557 LEFT JOIN pg_description d ON c.oid = d.objoid 

558 WHERE c.relname = %s AND c.relkind = 'i' 

559 AND c.relnamespace = current_schema::regnamespace 

560 """, indexname)) 

561 return cr.fetchone() if cr.rowcount else (None, None) 

562 

563 

564def create_index( 

565 cr, 

566 indexname, 

567 tablename, 

568 expressions, 

569 method='btree', 

570 where='', 

571 *, 

572 comment=None, 

573 unique=False 

574): 

575 """ Create the given index unless it exists. 

576 

577 :param cr: The cursor 

578 :param indexname: The name of the index 

579 :param tablename: The name of the table 

580 :param method: The type of the index (default: btree) 

581 :param where: WHERE clause for the index (default: '') 

582 :param comment: The comment to set on the index 

583 :param unique: Whether the index is unique or not (default: False) 

584 """ 

585 assert expressions, "Missing expressions" 

586 if index_exists(cr, indexname): 

587 return 

588 definition = SQL( 

589 "USING %s (%s)%s", 

590 SQL(method), 

591 SQL(", ").join(SQL(expression) for expression in expressions), 

592 SQL(" WHERE %s", SQL(where)) if where else SQL(), 

593 ) 

594 add_index(cr, indexname, tablename, definition, unique=unique, comment=comment) 

595 

596 

597def add_index(cr, indexname, tablename, definition, *, unique: bool, comment=''): 

598 """ Create an index. """ 

599 if isinstance(definition, str): 

600 definition = SQL(definition.replace('%', '%%')) 

601 else: 

602 definition = SQL(definition) 

603 query = SQL( 

604 "CREATE %sINDEX %s ON %s %s", 

605 SQL("UNIQUE ") if unique else SQL(), 

606 SQL.identifier(indexname), 

607 SQL.identifier(tablename), 

608 definition, 

609 ) 

610 query_comment = SQL( 

611 "COMMENT ON INDEX %s IS %s", 

612 SQL.identifier(indexname), comment, 

613 ) if comment else None 

614 cr.execute(query, log_exceptions=False) 

615 if query_comment: 

616 cr.execute(query_comment, log_exceptions=False) 

617 _schema.debug("Table %r: created index %r (%s)", tablename, indexname, definition.code) 

618 

619 

620def create_unique_index(cr, indexname, tablename, expressions): 

621 """ Create the given index unless it exists. """ 

622 warnings.warn("Since 19.0, use create_index(unique=True)", DeprecationWarning) 

623 return create_index(cr, indexname, tablename, expressions, unique=True) 

624 

625 

626def drop_index(cr, indexname, tablename): 

627 """ Drop the given index if it exists. """ 

628 cr.execute(SQL("DROP INDEX IF EXISTS %s", SQL.identifier(indexname))) 

629 _schema.debug("Table %r: dropped index %r", tablename, indexname) 

630 

631 

632def drop_view_if_exists(cr, viewname): 

633 kind = table_kind(cr, viewname) 

634 if kind == TableKind.View: 

635 cr.execute(SQL("DROP VIEW %s CASCADE", SQL.identifier(viewname))) 

636 elif kind == TableKind.Materialized: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true

637 cr.execute(SQL("DROP MATERIALIZED VIEW %s CASCADE", SQL.identifier(viewname))) 

638 

639 

640def escape_psql(to_escape): 

641 return to_escape.replace('\\', r'\\').replace('%', r'\%').replace('_', r'\_') 

642 

643 

644def pg_varchar(size=0): 

645 """ Returns the VARCHAR declaration for the provided size: 

646 

647 * If no size (or an empty or negative size is provided) return an 

648 'infinite' VARCHAR 

649 * Otherwise return a VARCHAR(n) 

650 

651 :param int size: varchar size, optional 

652 :rtype: str 

653 """ 

654 if size: 

655 if not isinstance(size, int): 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 raise ValueError("VARCHAR parameter should be an int, got %s" % type(size)) 

657 if size > 0: 657 ↛ 659line 657 didn't jump to line 659 because the condition on line 657 was always true

658 return 'VARCHAR(%d)' % size 

659 return 'VARCHAR' 

660 

661 

662def reverse_order(order): 

663 """ Reverse an ORDER BY clause """ 

664 items = [] 

665 for item in order.split(','): 

666 item = item.lower().split() 

667 direction = 'asc' if item[1:] == ['desc'] else 'desc' 

668 items.append('%s %s' % (item[0], direction)) 

669 return ', '.join(items) 

670 

671 

672def increment_fields_skiplock(records, *fields): 

673 """ 

674 Increment 'friendly' the given `fields` of the current `records`. 

675 If record is locked, we just skip the update. 

676 It doesn't invalidate the cache since the update is not critical. 

677 

678 :param records: recordset to update 

679 :param fields: integer fields to increment 

680 :returns: whether the specified fields were incremented on any record. 

681 :rtype: bool 

682 """ 

683 if not records: 

684 return False 

685 

686 for field in fields: 

687 assert records._fields[field].type == 'integer' 

688 

689 cr = records.env.cr 

690 tablename = records._table 

691 cr.execute(SQL( 

692 """ 

693 UPDATE %s 

694 SET %s 

695 WHERE id IN (SELECT id FROM %s WHERE id = ANY(%s) FOR UPDATE SKIP LOCKED) 

696 """, 

697 SQL.identifier(tablename), 

698 SQL(', ').join( 

699 SQL("%s = COALESCE(%s, 0) + 1", SQL.identifier(field), SQL.identifier(field)) 

700 for field in fields 

701 ), 

702 SQL.identifier(tablename), 

703 records.ids, 

704 )) 

705 return bool(cr.rowcount) 

706 

707 

708def value_to_translated_trigram_pattern(value): 

709 """ Escape value to match a translated field's trigram index content 

710 

711 The trigram index function jsonb_path_query_array("column_name", '$.*')::text 

712 uses all translations' representations to build the indexed text. So the 

713 original text needs to be JSON-escaped correctly to match it. 

714 

715 :param str value: value provided in domain 

716 :return: a pattern to match the indexed text 

717 """ 

718 if len(value) < 3: 

719 # matching less than 3 characters will not take advantage of the index 

720 return '%' 

721 

722 # apply JSON escaping to value; the argument ensure_ascii=False prevents 

723 # json.dumps from escaping unicode to ascii, which is consistent with the 

724 # index function jsonb_path_query_array("column_name", '$.*')::text 

725 json_escaped = json.dumps(value, ensure_ascii=False)[1:-1] 

726 

727 # apply PG wildcard escaping to JSON-escaped text 

728 wildcard_escaped = re.sub(r'(_|%|\\)', r'\\\1', json_escaped) 

729 

730 # add wildcards around it to get the pattern 

731 return f"%{wildcard_escaped}%" 

732 

733 

734def pattern_to_translated_trigram_pattern(pattern): 

735 """ Escape pattern to match a translated field's trigram index content 

736 

737 The trigram index function jsonb_path_query_array("column_name", '$.*')::text 

738 uses all translations' representations to build the indexed text. So the 

739 original pattern needs to be JSON-escaped correctly to match it. 

740 

741 :param str pattern: value provided in domain 

742 :return: a pattern to match the indexed text 

743 """ 

744 # find the parts around (non-escaped) wildcard characters (_, %) 

745 sub_patterns = re.findall(r''' 

746 ( 

747 (?:.)*? # 0 or more charaters including the newline character 

748 (?<!\\)(?:\\\\)* # 0 or even number of backslashes to promise the next wildcard character is not escaped 

749 ) 

750 (?:_|%|$) # a non-escaped wildcard charater or end of the string 

751 ''', pattern, flags=re.VERBOSE | re.DOTALL) 

752 

753 # unescape PG wildcards from each sub pattern (\% becomes %) 

754 sub_texts = [re.sub(r'\\(.|$)', r'\1', t, flags=re.DOTALL) for t in sub_patterns] 

755 

756 # apply JSON escaping to sub texts having at least 3 characters (" becomes \"); 

757 # the argument ensure_ascii=False prevents from escaping unicode to ascii 

758 json_escaped = [json.dumps(t, ensure_ascii=False)[1:-1] for t in sub_texts if len(t) >= 3] 

759 

760 # apply PG wildcard escaping to JSON-escaped texts (% becomes \%) 

761 wildcard_escaped = [re.sub(r'(_|%|\\)', r'\\\1', t) for t in json_escaped] 

762 

763 # replace the original wildcard characters by % 

764 return f"%{'%'.join(wildcard_escaped)}%" if wildcard_escaped else "%" 

765 

766 

767def make_identifier(identifier: str) -> str: 

768 """ Return ``identifier``, possibly modified to fit PostgreSQL's identifier size limitation. 

769 If too long, ``identifier`` is truncated and padded with a hash to make it mostly unique. 

770 """ 

771 # if length exceeds the PostgreSQL limit of 63 characters. 

772 if len(identifier) > 63: 

773 # We have to fit a crc32 hash and one underscore into a 63 character 

774 # alias. The remaining space we can use to add a human readable prefix. 

775 return f"{identifier[:54]}_{crc32(identifier.encode()):08x}" 

776 return identifier 

777 

778 

779def make_index_name(table_name: str, column_name: str) -> str: 

780 """ Return an index name according to conventions for the given table and column. """ 

781 return make_identifier(f"{table_name}__{column_name}_index")