Coverage for ingadhoc-odoo-saas-adhoc / saas_provider_upgrade / models / saas_upgrade_line.py: 42%

491 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:15 +0000

1############################################################################## 

2# For copyright and license notices, see __manifest__.py file in module root 

3# directory 

4############################################################################## 

5import logging 

6import re 

7import traceback 

8from textwrap import dedent 

9from typing import TYPE_CHECKING 

10 

11from markupsafe import Markup 

12from odoo import _, api, fields, models 

13from odoo.exceptions import RedirectWarning, UserError, ValidationError 

14from odoo.fields import Domain 

15from odoo.http import request 

16from odoo.tools import SQL, is_html_empty, safe_eval 

17from odoo.tools.misc import get_diff 

18from odooly import Client 

19 

20from ..constants import MAP_REQUEST_STATES, UL_EXECUTION_MODES, UL_TYPE_DB_REQUIREMENTS 

21from ..exceptions import UpgradeLineException 

22 

23SHELL_COMMAND_TEMPLATE = """ 

24request = env['helpdesk.ticket.upgrade.request'].browse({}) 

25upgrade_line = env['saas.upgrade.line'].browse({}) 

26eval_context = dict() 

27with request.with_context(no_new_client={}, no_close_cursor=True)._get_eval_context(eval_context, upgrade_line=upgrade_line): 

28 pass 

29for key, value in eval_context.items(): 

30 globals()[key] = value 

31""" 

32 

33if TYPE_CHECKING: 

34 from saas_provider_upgrade.models.helpdesk_ticket_upgrade_request import ( 

35 HelpdeskTicketUpgradeRequest as UpgradeRequest, 

36 ) 

37 from saas_provider_upgrade.models.saas_upgrade_line_request_run import ( 

38 SaasUpgradeLineRequestRun as UpgradeRequestRun, 

39 ) 

40 from saas_provider_upgrade.models.saas_upgrade_line_script import ( 

41 SaasUpgradeLineScript as UpgradeLineScript, 

42 ) 

43 

44_logger = logging.getLogger(__name__) 

45 

46 

47class SaasUpgradeLine(models.Model): 

48 _name = "saas.upgrade.line" 

49 _inherit = ["mail.thread", "mail.activity.mixin", "saas.provider.upgrade.util"] 

50 _description = "Upgrade Lines" 

51 _order = "type, sequence, name" 

52 

53 sequence = fields.Integer( 

54 default=10, 

55 help="Sequence can be used to run an upgrade line before another of the same type", 

56 copy=False, 

57 ) 

58 customer_note_sequence = fields.Integer( 

59 default=10, 

60 help="Customer notes are sorted by sequence and then by product. " 

61 "By default, all will have the same sequence, but you can prioritize by changing it here", 

62 copy=False, 

63 ) 

64 name = fields.Char( 

65 required=True, 

66 ) 

67 type = fields.Selection( 

68 [ 

69 ("0_on_create", "On Create"), 

70 ("1_evaluation", "Evaluation"), 

71 ("2_pre", "Pre Odoo"), 

72 ("3_pre-adhoc", "Pre Adhoc"), 

73 ("4_post", "Post Adhoc"), 

74 ("5_test", "Test & C. Notes"), 

75 ("6_after_done", "After Done"), 

76 ], 

77 help="The purpose and stage where the script will run depends on its type:\n" 

78 "* 0- On Create: runs before creating the request, intended to enforce constraints when creating a request. " 

79 "IMPORTANT: no client database is available yet.\n" 

80 "* 1- Evaluation: runs when the request is in draft, used to evaluate if everything is ready for an upgrade. " 

81 "Also runs when evaluating a database to check readiness. " 

82 "IMPORTANT: These scripts should NOT make changes to databases.\n" 

83 "* 2- Pre Odoo: used to make adjustments on the 'old' database before sending it to Odoo.\n" 

84 "* 3- Pre Adhoc: runs after restoring the database provided by Odoo and before running our upgrade " 

85 "(-u all). These scripts do not have 'new_client' to avoid waking the Odoo service until the next step. " 

86 "'new_run_sql' or 'new_run_openupgradelib_method' is usually used.\n" 

87 "* 4- Post Adhoc: runs after completing the upgrade of all modules, used for final database adjustments. " 

88 "After these scripts, the database is activated and missing modules are auto-installed (according to adhoc module classification).\n" 

89 "* 5- Test & C. Notes: used for automated tests and adding additional customer notes. " 

90 "IMPORTANT: These scripts should NOT make changes to databases.\n" 

91 "* 6- After Done: runs if the upgrade completes successfully.\n", 

92 required=True, 

93 tracking=True, 

94 change_default=True, 

95 ) 

96 customer_note_validation_script = fields.Text( 

97 copy=False, 

98 ) 

99 upgrade_ticket_ids = fields.Many2many( 

100 "helpdesk.ticket", 

101 domain="[('upgrade_team', '=', True)]", 

102 help="Used as custom upgrade on these upgrade tickets", 

103 ) 

104 upgrade_type_ids = fields.Many2many( 

105 "saas.upgrade.type", 

106 "saas_upgrade_scripts_type_rel", 

107 "upgrade_line_id", 

108 "type_id", 

109 ) 

110 old_filter_domain = fields.Char( 

111 tracking=True, 

112 help="If you want to evaluate not only that the module is installed, but that records exist in a model, " 

113 "add a dictionary where the key is the model and the value is the domain. " 

114 "If multiple elements are added, the warning will appear if there are records for at least one evaluation.", 

115 ) 

116 new_filter_domain = fields.Char( 

117 tracking=True, 

118 help="If you want to evaluate not only that the module is installed, but that records exist in a model, " 

119 "add a dictionary where the key is the model and the value is the domain. " 

120 "If multiple elements are added, the warning will appear if there are records for at least one evaluation.", 

121 ) 

122 module_ids = fields.Many2many( 

123 "adhoc.module", 

124 "adhoc_product_upgrade_line_rel", 

125 "upgrade_line_id", 

126 "module_id", 

127 string="Modules", 

128 help="If modules are defined, the action runs only if all modules are installed in the old database.", 

129 ) 

130 from_major_version_ids = fields.Many2many( 

131 "saas.odoo.version.group", 

132 string="From Odoo Projects", 

133 tracking=True, 

134 help="This script will only run for upgrades coming from these versions", 

135 ) 

136 title = fields.Char( 

137 copy=False, 

138 ) 

139 message = fields.Html( 

140 copy=False, 

141 prefetch=True, 

142 sanitize=False, 

143 help="Message displayed to the client in the Customer Note", 

144 ) 

145 is_message_empty = fields.Boolean( 

146 compute="_compute_messages_empty", 

147 help="Computed since message is HTML; used to determine if empty for some view conditions.", 

148 ) 

149 dev_message = fields.Html( 

150 copy=False, 

151 prefetch=True, 

152 sanitize=False, 

153 help="Used to test the Customer Note message. Must be approved to apply.", 

154 ) 

155 is_dev_message_empty = fields.Boolean( 

156 compute="_compute_messages_empty", 

157 help="Computed since dev_message is HTML; used to determine if empty for some view conditions.", 

158 ) 

159 adhoc_product_id = fields.Many2one( 

160 "adhoc.product", 

161 domain=[("parent_id", "!=", False), ("type", "=", "horizontal")], 

162 required=True, 

163 change_default=True, 

164 ) 

165 technical_team_id = fields.Many2one(related="adhoc_product_id.technical_team_id") 

166 product_owner_id = fields.Many2one(related="adhoc_product_id.product_owner_id") 

167 add_customer_note = fields.Boolean() 

168 customer_note_validation = fields.Boolean(copy=False) 

169 customer_note_upload_changes = fields.Boolean(copy=False) 

170 force_reopen_customer_note = fields.Boolean( 

171 help="If set, customer notes generated from this upgrade line will be reopened if the content changes.", 

172 ) 

173 active = fields.Boolean(default=True, tracking=True) 

174 description = fields.Html(copy=False) 

175 description_customer_note_validation = fields.Html(copy=False) 

176 request_id = fields.Many2one( 

177 "helpdesk.ticket.upgrade.request", 

178 compute="_compute_request_and_request_run", 

179 ) 

180 customer_note_ids = fields.One2many("helpdesk.ticket.customer_note", "upgrade_line_id") 

181 upgrade_line_request_run_ids = fields.One2many( 

182 "saas.upgrade.line.request.run", 

183 "upgrade_line_id", 

184 ) 

185 run_info_id = fields.Many2one( 

186 "saas.upgrade.line.request.run", 

187 compute="_compute_request_and_request_run", 

188 help="Upgrade Line state for this request.\n" 

189 " - Done (duration): script finished, shows duration.\n" 

190 " - Running: script is currently running.\n" 

191 " - N/A: script does not apply to request databases.\n" 

192 " If script hasn't run yet or does not exist, remains empty.", 

193 ) 

194 state = fields.Selection( 

195 [ 

196 ("draft", "Draft"), 

197 ("to_correct", "To Correct"), 

198 ("approved", "Approved"), 

199 ], 

200 default="draft", 

201 tracking=True, 

202 copy=False, 

203 ) 

204 priority = fields.Selection( 

205 [ 

206 ("0", "Low Priority"), 

207 ("1", "High Priority"), 

208 ], 

209 default="0", 

210 tracking=True, 

211 copy=False, 

212 ) 

213 helpdesk_ticket_ids = fields.Many2many( 

214 "helpdesk.ticket", 

215 "saas_upgrade_line_ticket_rel", 

216 "upgrade_line_id", 

217 "ticket_id", 

218 string="Tickets", 

219 help="Tickets associated with the Upgrade Line.", 

220 ) 

221 project_task_ids = fields.Many2many( 

222 "project.task", 

223 "saas_upgrade_line_task_rel", 

224 "upgrade_line_id", 

225 "task_id", 

226 string="Tasks", 

227 help="Tasks associated with the Upgrade Line.", 

228 ) 

229 allow_bypass = fields.Boolean( 

230 "Allow bypass", 

231 default="True", 

232 help="If checked, the script is skipped when creating the request", 

233 ) 

234 upgrade_line_request_log_ids = fields.One2many( 

235 "saas.upgrade.line.request.log", 

236 "upgrade_line_id", 

237 ) 

238 filter_by_version = fields.Selection( 

239 [ 

240 ("all", "All versions"), 

241 ("selected", "Specific versions"), 

242 ], 

243 string="Apply from", 

244 compute="_compute_filter_by_version", 

245 inverse="_inverse_filter_by_version", 

246 ) 

247 allowed_from_major_version_ids = fields.Many2many( 

248 "saas.odoo.version.group", 

249 compute="_compute_allowed_from_major_version_ids", 

250 ) 

251 run_type = fields.Selection( 

252 [("production", "Production"), ("test", "Test")], 

253 string="Run in", 

254 help="If empty, it will run in both Test and Production. Otherwise, it will run only in the selected environment.", 

255 ) 

256 actual_script_id = fields.Many2one( 

257 "saas.upgrade.line.script", 

258 copy=False, 

259 ) 

260 upgrade_line_script_version_ids = fields.One2many( 

261 "saas.upgrade.line.script", 

262 "upgrade_line_id", 

263 ) 

264 script_version = fields.Integer( 

265 related="actual_script_id.version", 

266 help="Current script version", 

267 tracking=True, 

268 ) 

269 script = fields.Text( 

270 related="actual_script_id.script", 

271 store=True, 

272 copy=False, 

273 ) 

274 dev_script = fields.Text( 

275 copy=False, 

276 ) 

277 script_diff = fields.Html( 

278 sanitize_tags=False, 

279 compute="_compute_script_diff", 

280 ) 

281 my_dependencies_ids = fields.Many2many( 

282 "saas.upgrade.line", 

283 "saas_upgrade_line_dependencies_rel", 

284 "line_id", 

285 "related_line_id", 

286 domain="[('type', '=', type)]", 

287 copy=False, 

288 ) 

289 depends_on_me_ids = fields.Many2many( 

290 "saas.upgrade.line", 

291 "saas_upgrade_line_dependencies_rel", 

292 "related_line_id", 

293 "line_id", 

294 domain="[('type', '=', type)]", 

295 copy=False, 

296 ) 

297 ul_avg_duration = fields.Float( 

298 compute="_compute_ul_avg_duration", 

299 ) 

300 computed_help_info = fields.Html( 

301 string="Help Information", 

302 compute="_compute_computed_help_info", 

303 store=False, 

304 readonly=True, 

305 ) 

306 placeholders = fields.Json( 

307 compute="_compute_request_and_request_run", 

308 ) 

309 shell_command = fields.Text( 

310 compute="_compute_shell_command", 

311 help="Shell command to access the upgraded database environment via Odoo shell.", 

312 ) 

313 module_check_logic = fields.Selection( 

314 [("all", "All"), ("any", "Any")], 

315 default="all", 

316 required=True, 

317 help="Determines module validation logic:\n" 

318 "- 'All': All listed modules must be installed in the old database.\n" 

319 "- 'Any': At least one listed module must be installed in the old database.", 

320 ) 

321 execution_mode = fields.Selection( 

322 related="actual_script_id.execution_mode", 

323 depends=["actual_script_id", "actual_script_id.execution_mode"], 

324 store=True, 

325 help="Execution mode:\n" 

326 "- 'Odooly': Uses the Odooly client. Better for simple scripts.\n" 

327 "- 'Odoo Shell': Runs in an Odoo shell. Better for complex scripts.", 

328 ) 

329 dev_execution_mode = fields.Selection( 

330 UL_EXECUTION_MODES, 

331 required=True, 

332 default="odooly", 

333 help="Execution mode to use when testing and approving the dev script.", 

334 ) 

335 

336 def _compute_computed_help_info(self): 

337 config_param = self.env["ir.config_parameter"].sudo() 

338 help_content = config_param.get_param("res.config.settings.code_help_html", default="") 

339 for record in self: 

340 record.computed_help_info = help_content 

341 

342 def _compute_ul_avg_duration(self): 

343 """Compute the average duration of runs for each upgrade line on state Done.""" 

344 if not self.ids: 

345 self.ul_avg_duration = 0.0 

346 return 

347 self.env["saas.upgrade.line.request.run"].flush_model() 

348 results = self.env.execute_query_dict( 

349 SQL( 

350 """ 

351 SELECT upgrade_line_id, AVG(duration) as avg 

352 FROM saas_upgrade_line_request_run 

353 WHERE upgrade_line_id IN %(ids)s AND state = 'done' 

354 GROUP BY upgrade_line_id 

355 """, 

356 ids=tuple(self.ids), 

357 ) 

358 ) 

359 avg_map = {r["upgrade_line_id"]: r["avg"] or 0.0 for r in results} 

360 for rec in self: 

361 rec.ul_avg_duration = avg_map.get(rec.id, 0.0) 

362 

363 @api.depends_context("ticket_request_id") 

364 def _compute_request_and_request_run(self): 

365 upgrade_request_obj = self.env["helpdesk.ticket.upgrade.request"] 

366 ticket_request_id = self.env.context.get("ticket_request_id", False) 

367 for rec in self: 

368 if ticket_request_id: 368 ↛ 375line 368 didn't jump to line 375 because the condition on line 368 was always true

369 rec.request_id = upgrade_request_obj.browse(ticket_request_id) 

370 rec.run_info_id = rec.request_id.upgrade_line_request_run_ids.filtered( 

371 lambda x: x.upgrade_line_id == rec 

372 ) 

373 rec.placeholders = rec.run_info_id.placeholders 

374 else: 

375 rec.request_id = upgrade_request_obj 

376 rec.run_info_id = False 

377 rec.placeholders = False 

378 

379 @api.depends_context("request_id") 

380 def _compute_shell_command(self): 

381 for rec in self: 

382 ticket_request_id = rec.request_id 

383 if ticket_request_id: 

384 no_new_client = not ticket_request_id.with_upgraded_database 

385 rec.shell_command = dedent( 

386 SHELL_COMMAND_TEMPLATE.format(ticket_request_id.id, rec.id, no_new_client) 

387 ).strip() 

388 else: 

389 rec.shell_command = "" 

390 

391 @api.depends("script", "dev_script") 

392 def _compute_script_diff(self): 

393 for rec in self: 

394 orig_script = [rec.script or "", "Old Script"] 

395 new_script = [rec.dev_script or "", "New Script"] 

396 rec.script_diff = get_diff( 

397 orig_script, 

398 new_script, 

399 custom_style=False, 

400 dark_color_scheme=request and request.httprequest.cookies.get("color_scheme") == "dark", 

401 ) 

402 

403 @api.depends("upgrade_type_ids") 

404 def _compute_allowed_from_major_version_ids(self): 

405 all_versions = self.env["saas.odoo.version.group"].search([]).ids 

406 for rec in self: 

407 if rec.upgrade_type_ids: 

408 rec.allowed_from_major_version_ids = rec.mapped("upgrade_type_ids.from_major_version_ids") 

409 else: 

410 rec.allowed_from_major_version_ids = all_versions 

411 

412 @api.depends("from_major_version_ids") 

413 def _compute_filter_by_version(self): 

414 for rec in self: 

415 if rec.from_major_version_ids: 

416 rec.filter_by_version = "selected" 

417 else: 

418 rec.filter_by_version = "all" 

419 

420 @api.depends("message", "dev_message") 

421 def _compute_messages_empty(self): 

422 for rec in self: 

423 rec.is_message_empty = not rec.message or is_html_empty(rec.message) 

424 rec.is_dev_message_empty = not rec.dev_message or is_html_empty(rec.dev_message) 

425 

426 @api.depends("name", "script_version") 

427 def _compute_display_name(self): 

428 for rec in self: 

429 if rec.script_version and rec.script_version > 0: 

430 rec.display_name = f"(V-{rec.script_version}) {rec.name}" 

431 else: 

432 rec.display_name = rec.name 

433 

434 @api.depends("filter_by_version") 

435 def _inverse_filter_by_version(self): 

436 for rec in self.filtered(lambda x: x.filter_by_version == "all"): 

437 rec.from_major_version_ids = False 

438 

439 @api.onchange("module_ids") 

440 def module_version_id_adhoc_product(self): 

441 for rec in self: 

442 if not rec.adhoc_product_id and rec.module_ids: 

443 rec.adhoc_product_id = rec.module_ids[0].adhoc_product_id 

444 

445 @api.onchange("upgrade_type_ids") 

446 def _onchange_upgrade_type_ids(self): 

447 """Clean from_major_version_ids removing: 

448 1. Target versions of the selected upgrade types 

449 2. Versions that are not in the allowed from_major_version_ids of any selected upgrade type 

450 """ 

451 for rec in self.filtered(lambda r: r.upgrade_type_ids and r.from_major_version_ids): 

452 rec.from_major_version_ids = rec.from_major_version_ids & rec.allowed_from_major_version_ids 

453 

454 @api.constrains("sequence", "my_dependencies_ids") 

455 def _check_sequence_by_dependencies(self): 

456 message = "" 

457 for rec in self.my_dependencies_ids: 457 ↛ 458line 457 didn't jump to line 458 because the loop on line 457 never started

458 if self.sequence <= rec.sequence: 

459 message += f"{rec.name}: Sequence {rec.sequence}\n" 

460 if message: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 raise ValidationError( 

462 f"The sequence of this UL cannot be less than or equal to the sequences of its dependencies: \n {message}" 

463 ) 

464 

465 @api.constrains("sequence", "depends_on_me_ids") 

466 def _check_sequence_by_reverse_dependencies(self): 

467 message = "" 

468 for rec in self.depends_on_me_ids: 468 ↛ 469line 468 didn't jump to line 469 because the loop on line 468 never started

469 if self.sequence >= rec.sequence: 

470 message += f"{rec.name}: Sequence {rec.sequence}\n" 

471 if message: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 raise ValidationError( 

473 f"The sequence of this UL cannot be greater than or equal to the sequences of ULs that depend on it: \n {message}" 

474 ) 

475 

476 @api.constrains("new_filter_domain", "old_filter_domain") 

477 def check_filter_domain_value(self): 

478 for rec in self: 

479 if rec.old_filter_domain: 

480 try: 

481 safe_eval.safe_eval(rec.old_filter_domain, rec._get_filters_eval_context()) 

482 except Exception as e: 

483 raise ValidationError(_("Error in 'Old Filter Domain' for script '%s':\n%s") % (rec.name, str(e))) 

484 if rec.new_filter_domain: 

485 try: 

486 safe_eval.safe_eval(rec.new_filter_domain, rec._get_filters_eval_context()) 

487 except Exception as e: 

488 raise ValidationError(_("Error in 'New Filter Domain' for script '%s':\n%s") % (rec.name, str(e))) 

489 

490 @api.constrains("upgrade_ticket_ids", "upgrade_type_ids") 

491 def _check_upgrade_line_usage(self): 

492 for rec in self: 

493 if not rec.upgrade_ticket_ids and not rec.upgrade_type_ids: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true

494 raise ValidationError(_("An upgrade line must be used in either tickets or upgrade types")) 

495 

496 def copy_data(self, default=None): 

497 vals_list = super().copy_data(default=default) 

498 for val in vals_list: 

499 val.update( 

500 { 

501 "name": "%s (copy)" % self.name, 

502 "title": "%s (copy)" % self.title if self.add_customer_note else False, 

503 "dev_script": self.script, 

504 "dev_message": self.message if self.add_customer_note else False, 

505 } 

506 ) 

507 return vals_list 

508 

509 def write(self, vals): 

510 res = super().write(vals) 

511 for rec in self: 

512 # Delete associated customer notes if the upgrade_type_ids are changed 

513 if "upgrade_type_ids" in vals: 

514 if rec.upgrade_type_ids: 514 ↛ 523line 514 didn't jump to line 523 because the condition on line 514 was always true

515 allowed_versions = set(rec.upgrade_type_ids.mapped("target_odoo_version_group_id")) 

516 notes_to_unlink = rec.customer_note_ids.filtered( 

517 lambda c: c.ticket_id.upgrade_type_id.target_odoo_version_group_id not in allowed_versions 

518 ) 

519 if notes_to_unlink: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 notes_to_unlink.unlink() 

521 else: 

522 # If upgrade_type_ids is cleared, unlink all related customer notes 

523 rec.customer_note_ids.unlink() 

524 

525 # Archive or unarchive customer notes based on add_customer_note field 

526 if "add_customer_note" in vals: 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true

527 if not rec.add_customer_note: 

528 rec.customer_note_ids.action_archive() 

529 else: 

530 rec.customer_note_ids.action_unarchive() 

531 return res 

532 

533 def action_archive(self): 

534 for rec in self: 

535 # delete associated customer notes if ULs are archived 

536 rec.with_context(active_test=False).sudo().customer_note_ids.unlink() 

537 rec.state = "draft" 

538 super().action_archive() 

539 

540 def action_get_script_versions(self) -> dict: 

541 self.ensure_one() 

542 domain = [("upgrade_line_id", "=", self.id)] 

543 action = self.env["ir.actions.act_window"]._for_xml_id("saas_provider_upgrade.action_saas_upgrade_line_script") 

544 action["domain"] = domain 

545 return action 

546 

547 def action_get_running_status(self) -> dict: 

548 self.ensure_one() 

549 action = self.env["ir.actions.act_window"]._for_xml_id( 

550 "saas_provider_upgrade.action_saas_upgrade_line_request_run" 

551 ) 

552 action["domain"] = [ 

553 ["upgrade_line_id", "=", self.id], 

554 ["request_id", "!=", False], 

555 ] 

556 return action 

557 

558 def action_check_filter_domain(self) -> dict | bool: 

559 self.ensure_one() 

560 field_name = self.env.context.get("field_type", False) 

561 if not field_name or not self.request_id: 

562 return False 

563 

564 if field_name == "old_filter_domain": 

565 database = self.request_id.original_database_id 

566 elif field_name == "new_filter_domain": 

567 database = self.request_id.upgraded_database_id 

568 else: 

569 return False 

570 

571 db_client = database.get_client_cluster() 

572 self._check_filter_domain(db_client, getattr(self, field_name)) 

573 

574 title = self.env._("Filter tested successfully") 

575 message = self.env._("The filter has been tested and its result was successful") 

576 return self.build_display_notification_action(title, message) 

577 

578 def action_get_requests(self) -> dict: 

579 self.ensure_one() 

580 domain = Domain("aim", "=", "test") & Domain("automatic", "=", False) 

581 db_requirement = UL_TYPE_DB_REQUIREMENTS.get(self.type, None) 

582 if db_requirement == "with_original_db": 

583 domain &= Domain("with_original_database", "=", True) 

584 if db_requirement == "with_upgraded_db": 

585 domain &= Domain("with_upgraded_database", "=", True) 

586 

587 if self.upgrade_type_ids: 

588 domain &= Domain("upgrade_type_id", "in", self.upgrade_type_ids.ids) 

589 if self.upgrade_ticket_ids: 

590 domain &= Domain("ticket_id", "in", self.upgrade_ticket_ids.ids) 

591 

592 requests = self.env["helpdesk.ticket.upgrade.request"].search(domain) 

593 if self.module_ids: 

594 script_modules = set(self.module_ids.mapped("name")) 

595 match self.module_check_logic: 

596 case "all": 

597 requests = requests.filtered( 

598 lambda x: script_modules.issubset(set(x.ticket_id.main_database_id.module_ids.mapped("name"))) 

599 ) 

600 case "any": 

601 requests = requests.filtered( 

602 lambda x: bool( 

603 script_modules.intersection(set(x.ticket_id.main_database_id.module_ids.mapped("name"))) 

604 ) 

605 ) 

606 action = self.env["ir.actions.act_window"]._for_xml_id("saas_provider_upgrade.action_ticket_upgrade_request") 

607 action["context"] = {"by_upgrade_line": True, "upgrade_line_id": self.id} 

608 action["domain"] = [("id", "in", requests.ids)] 

609 return action 

610 

611 def action_approve_script_on_background(self) -> dict: 

612 self.ensure_one() 

613 if is_html_empty(self.description): 

614 raise UserError(_("The description must be complete to approve the script!")) 

615 if self.old_filter_domain: 

616 self.with_context(test_dev=True, field_type="old_filter_domain").action_check_filter_domain() 

617 if self.new_filter_domain: 

618 self.with_context(test_dev=True, field_type="new_filter_domain").action_check_filter_domain() 

619 return self.with_context(approve=True, test_dev=True)._prepare_run() 

620 

621 def action_approve_script(self): 

622 """ 

623 Action called to approve the dev_script and set it as the actual_script. 

624 """ 

625 self.ensure_one() 

626 self._validate_ops() 

627 script = self.dev_script 

628 if not script: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 raise UserError(_("The script cannot be empty when approving!")) 

630 

631 new_script = self.env["saas.upgrade.line.script"].create( 

632 { 

633 "script": script, 

634 "version": self.script_version + 1, 

635 "upgrade_line_id": self.id, 

636 "execution_mode": self.dev_execution_mode, 

637 } 

638 ) 

639 self.write( 

640 { 

641 "actual_script_id": new_script.id, 

642 "dev_script": False, 

643 "state": "approved", 

644 } 

645 ) 

646 

647 def action_approve_message(self): 

648 self.write( 

649 { 

650 "message": self.dev_message, 

651 "dev_message": "", 

652 "state": "approved", 

653 } 

654 ) 

655 self.message_post(body=self.env._("Customer Note message approved")) 

656 

657 def action_copy_script(self): 

658 self.ensure_one() 

659 self.dev_script = self.script 

660 self.dev_execution_mode = self.execution_mode 

661 

662 def action_copy_message(self): 

663 self.ensure_one() 

664 self.dev_message = self.message 

665 

666 def action_to_correct(self): 

667 """ 

668 Action to mark upgrade line as 'To Correct' 

669 """ 

670 self.ensure_one() 

671 self.state = "to_correct" 

672 

673 partner_ids = [] 

674 if self.adhoc_product_id.product_owner_id.user_id: 

675 partner_ids.append(self.adhoc_product_id.product_owner_id.user_id.partner_id.id) 

676 if self.adhoc_product_id.technical_expert_id.user_id: 

677 partner_ids.append(self.adhoc_product_id.technical_expert_id.user_id.partner_id.id) 

678 

679 if partner_ids: 

680 self.message_post( 

681 body="This script need fixes, please check and approve again if pass the test.", 

682 partner_ids=partner_ids, 

683 subtype_xmlid="mail.mt_note", 

684 message_type="notification", 

685 ) 

686 

687 def action_set_draft(self): 

688 """ 

689 Action to set upgrade line back to 'Draft' 

690 """ 

691 self.ensure_one() 

692 self.state = "draft" 

693 

694 def action_approve(self): 

695 """ 

696 Action to approve upgrade line directly without testing 

697 """ 

698 self.ensure_one() 

699 self.state = "approved" 

700 

701 def action_deactivate_script(self): 

702 self.ensure_one() 

703 if not self.request_id: 

704 raise UserError(_("Cannot deactivate a script without a request!")) 

705 if self.request_id.automatic or self.request_id.status == "running": 

706 raise UserError(_("Cannot deactivate a script in a running request!")) 

707 self.request_id.deactivated_script_ids = [(4, self.id)] 

708 

709 def action_prepare_run(self) -> dict | None: 

710 """ 

711 Action that queues the script to run in the background. 

712 

713 1. runs the script or 

714 2. runs the dev_script 

715 

716 this actions are done depending on the context. 

717 """ 

718 self.ensure_one() 

719 if not self.request_id: 

720 raise RedirectWarning( 

721 message=_("A request is needed to run the script!"), 

722 action=self.action_get_requests(), 

723 button_text=_("See Requests"), 

724 ) 

725 if self.request_id.automatic: 

726 raise UserError(self.env._("Cannot run the script on an automatic request!")) 

727 

728 self._validate_ops() 

729 return self._prepare_run() 

730 

731 def run(self, request: "UpgradeRequest") -> Markup | None: 

732 """ 

733 In this method we made some things before executing "rec": 

734 * Check if there is an original database in the request 

735 * Check if there is an upgraded database in the request 

736 * Filter the scripts that have already run 

737 

738 :param request: The request where the 'records' will run 

739 :return: A message to send to the user or None if no message is needed. 

740 """ 

741 self.ensure_one() 

742 test_dev = self.env.context.get("test_dev", False) 

743 original_db = request.original_database_id 

744 upgraded_db = request.upgraded_database_id 

745 map_req_state = MAP_REQUEST_STATES.get(request.state, False) 

746 

747 if map_req_state == "before_odoo" and not original_db: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true

748 raise UserError(self.env._("There is no original database!")) 

749 elif map_req_state == "after_odoo" and not upgraded_db: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 raise UserError(self.env._("There is no upgraded database!")) 

751 

752 # If it is run with a button or with 'test_dev' we always run it 

753 # If not, we filter the scripts that have already run 

754 self = self.with_context(ticket_request_id=request.id) 

755 run_info = self.run_info_id or self.env["saas.upgrade.line.request.run"] 

756 if not test_dev and run_info.state == "done": 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true

757 return None 

758 

759 request.upgrade_line_request_log_entry_ids.filtered( 

760 lambda e: e.upgrade_line_id == self and e.type == "warning" 

761 ).write({"solved": True}) # Mark previous warnings as solved 

762 

763 # Manage freezed scripts 

764 script_version = self._get_script_version() 

765 script = script_version.script 

766 execution_mode = script_version.execution_mode 

767 run_info.start(script_version) 

768 match execution_mode: 

769 case "odooly": 769 ↛ 771line 769 didn't jump to line 771 because the pattern on line 769 always matched

770 return self._run_odooly(request, run_info, script) 

771 case "odoo_shell": 

772 return self._run_odoo_shell(request, run_info, script) 

773 case _: 

774 _logger.error("Invalid execution mode '%s' for upgrade line '%s'", execution_mode, self.name) 

775 raise UserError(_("Invalid execution mode for upgrade line '%s'") % self.name) 

776 

777 def _run_odooly(self, request: "UpgradeRequest", run_info: "UpgradeRequestRun", script: str) -> Markup | None: 

778 """ 

779 Runs the script using odooly connections to the client clusters. 

780 

781 :param request: The request where the script will run 

782 :param run_info: The run info record 

783 :return: The result of the script execution 

784 """ 

785 self.ensure_one() 

786 test_dev = self.env.context.get("test_dev", False) 

787 eval_context: dict = {} 

788 if not self.type == "0_on_create": 

789 self.env["ir.cron"]._commit_progress() # Commit to reflect the run_info state 

790 

791 with request._get_eval_context(eval_context, upgrade_line=self): 

792 try: 

793 safe_eval.safe_eval(script, eval_context, mode="exec") 

794 except Exception as e: 

795 self.env.cr.rollback() 

796 lineno = self._extract_script_lineno(e) 

797 error = str(e) 

798 raise UpgradeLineException(self.env, request, self, error, lineno) 

799 

800 result = eval_context.pop("result", None) 

801 breaks = eval_context.pop("breaks", None) 

802 if breaks and not test_dev: 

803 raise UserError(breaks) 

804 

805 self._process_customer_note(eval_context, request, run_info) 

806 return run_info.finish(result=breaks or result) # 'breaks' has priority over 'result' if it's defined 

807 

808 def _run_odoo_shell(self, request: "UpgradeRequest", run_info: "UpgradeRequestRun", script: str): 

809 """ 

810 Runs the script as a k8s job in the client cluster. 

811 

812 :param request: The request where the script will run 

813 :param run_info: The run info record 

814 """ 

815 self.ensure_one() 

816 original_db = request.original_database_id 

817 upgraded_db = request.upgraded_database_id 

818 db = upgraded_db if UL_TYPE_DB_REQUIREMENTS.get(self.type, None) == "with_upgraded_db" else original_db 

819 if not db: 

820 raise UserError(self.env._("There is no database to run the job!")) 

821 

822 db._schedule_task( 

823 "scriptupgrade", run_info_id=run_info.id, script=script, state_on_success=db.state, **self.env.context 

824 ) 

825 

826 def _applies(self, old_client: Client, new_client: Client) -> bool: 

827 """ 

828 Checks if the script applies to the request (only if script type is not "after_done") 

829 We use _check_filter_domain to evaluate the old and new filter domains. 

830 :param old_client: Client connected to the original database (resolved externally to avoid repeated calls) 

831 :param new_client: Client connected to the upgraded database (resolved externally to avoid repeated calls) 

832 :return: True if the script applies, False otherwise. 

833 """ 

834 self.ensure_one() 

835 # Always run the "after_done" scripts 

836 if self.type == "6_after_done" or not old_client: 

837 return True 

838 

839 # We check old and new conditions if they are defined 

840 db_requirement = UL_TYPE_DB_REQUIREMENTS.get(self.type, None) 

841 checks = [(old_client, self.old_filter_domain)] 

842 if db_requirement == "with_upgraded_db": 

843 checks.append((new_client, self.new_filter_domain)) 

844 

845 for client, filter_domain in checks: 

846 if not filter_domain: 

847 continue 

848 try: 

849 if not self._check_filter_domain(client, filter_domain): 

850 return False 

851 except Exception as e: 

852 raise UpgradeLineException(self.env, self.request_id, self, str(e)) 

853 return True 

854 

855 def _prepare_run(self) -> dict | None: 

856 """ 

857 Method that creates a 'saas.upgrade.line.request.run' record on 'pending' state 

858 and triggers the cron to process it 

859 """ 

860 for rec in self: 

861 rec._create_run_info_id() 

862 res = rec.run_info_id.enqueue() 

863 return res 

864 

865 def _get_script_version(self) -> "UpgradeLineScript": 

866 """ 

867 Function used to get the script to use 

868 

869 Condition: Active production freeze is True, we are in a production request and 

870 UL type is not in UL_TYPES_FREEZE_IGNORES 

871 

872 1. If we are testing, return a temporary record with the dev_script and dev_execution_mode 

873 2. If the UL does not have an script, and generates a customer note, we return a temporary record 

874 with an empty script and 'odooly' as execution mode. 

875 3. If 'condition' is True, we return the version to use, that is located in 'upgrade_lines_version_to_use' map 

876 4. If 'condition' is False, we return the last version of the script 

877 

878 :return: The script register to execute 

879 """ 

880 UpgradeLineScript = self.env["saas.upgrade.line.script"] 

881 if self.env.context.get("test_dev", False): 

882 return UpgradeLineScript.new( 

883 { 

884 "script": self.dev_script, 

885 "execution_mode": self.dev_execution_mode, 

886 "upgrade_line_id": self.id, 

887 } 

888 ) 

889 

890 actual_script = self.actual_script_id 

891 if not actual_script and self.add_customer_note: 

892 return UpgradeLineScript.new( 

893 { 

894 "script": "", 

895 "execution_mode": "odooly", 

896 "upgrade_line_id": self.id, 

897 } 

898 ) 

899 

900 if self.run_type == "production": 

901 return actual_script 

902 

903 # The request is only required at this point 

904 request = self.request_id 

905 if not request: 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true

906 raise UserError(_("Cannot determine script version without a request!")) 

907 

908 ul_type = self.type 

909 condition = ( 

910 request.active_production_freeze 

911 and request.aim == "production" 

912 and not self._production_freeze_ignore(ul_type) 

913 ) 

914 match condition: 

915 case True: 

916 # 'upgrade_line_versions_to_use' is a mapping in the request that has 

917 # the versions to use for each UL, with the 

918 # following structure: 

919 # 

920 # { 

921 # "<ul_id>": <version>, 

922 # ... 

923 # } 

924 upgrade_line_versions_to_use = request._get_upgrade_line_versions_to_use() 

925 version_to_use = upgrade_line_versions_to_use.get(str(self.id), -1) 

926 script_version = self.upgrade_line_script_version_ids.filtered(lambda x: x.version == version_to_use) 

927 if not script_version: 

928 raise UserError( 

929 self.env._("There must be a version for UL %s in the request %s!") % (self.id, request.id) 

930 ) 

931 case False: 931 ↛ 934line 931 didn't jump to line 934 because the pattern on line 931 always matched

932 script_version = actual_script 

933 

934 return script_version 

935 

936 def _create_run_info_id(self): 

937 """ 

938 Used to create a running status associated with the UL and the request 

939 """ 

940 if not self.request_id: 

941 raise UserError(_("Cannot create run info without a request!")) 

942 

943 if self.run_info_id: 

944 return 

945 self.run_info_id = self.env["saas.upgrade.line.request.run"].create( 

946 { 

947 "upgrade_line_id": self.id, 

948 "request_id": self.request_id and self.request_id.id or False, 

949 } 

950 ) 

951 

952 def _process_customer_note(self, context: dict, request: "UpgradeRequest", run_info: "UpgradeRequestRun"): 

953 """ 

954 Method to process the customer note after running the script. 

955 In test mode, it only prepares the placeholders. 

956 Otherwise, it generates the customer note. 

957 

958 :param context: Context with variables from the script execution 

959 :param request: The upgrade request 

960 :param run_info: The run info record 

961 """ 

962 test_dev = self.env.context.get("test_dev", False) 

963 message = self.dev_message if test_dev else self.message 

964 if request.aim == "test" and self.add_customer_note and message: 

965 used_vars = self.extract_qweb_variables(message) 

966 placeholders = ( 

967 {k: v for k, v in context.items() if (bool(v) and self.is_serializable(v) and k in used_vars)} 

968 if used_vars 

969 else {} 

970 ) 

971 if test_dev: 

972 run_info.write({"placeholders": placeholders}) 

973 else: 

974 self.env["helpdesk.ticket.customer_note"].generate(self, request, placeholders) 

975 

976 def _check_filter_domain(self, client: Client, domain_to_check: str) -> bool: 

977 """ 

978 Used to check if the filter domain returns any record in the database. 

979 

980 filter_domains = { 

981 "res.partner": "[('customer', '=', True)]", 

982 ... 

983 } 

984 

985 :param client: Client connected to the database to check 

986 :param domain_to_check: Domain to check 

987 :return: True if the domain returns any record, False otherwise 

988 """ 

989 filter_domain_ok = False 

990 try: 

991 eval = safe_eval.safe_eval(domain_to_check, self._get_filters_eval_context()) 

992 for model, domain in eval.items(): 

993 if client.env[model].search_count(domain): 

994 filter_domain_ok = True 

995 break 

996 except Exception as e: 

997 if not self.env.context.get("no_raise_on_filter_error"): 

998 raise UserError( 

999 self.env._("An error was logged with a filter domain in the script '%s'.\n Error message:\n %s") 

1000 % (self.name, e) 

1001 ) 

1002 _logger.warning("Filter domain validation failed for script '%s': %s", self.name, e) 

1003 return False 

1004 return filter_domain_ok 

1005 

1006 def _validate_ops(self): 

1007 """ 

1008 Validate that the script does not contain forbidden operations. 

1009 This is to prevent malicious code execution or unintended operations. 

1010 """ 

1011 types = [x[0] for x in self._fields["type"].selection] 

1012 forbidden_patterns = [ 

1013 (lambda _: True, r"^(?!\s*#)(.*\b(new|old_database\.env)\b.*)$"), 

1014 (lambda _: True, r"^(?!\s*#)(.*request\.env.*)$"), 

1015 (lambda r: types.index(r.type) <= types.index("3_pre-adhoc"), r"^(?!\s*#)(.*new_client.*)$"), 

1016 ] 

1017 

1018 script_version = self._get_script_version() 

1019 script = script_version.script 

1020 execution_mode = script_version.execution_mode 

1021 msg = None 

1022 match execution_mode: 

1023 case "odooly": 

1024 msg = safe_eval.test_python_expr(expr=script, mode="exec") 

1025 case "odoo_shell": 

1026 # Use compile in 'odoo_shell' to avoid import errors 

1027 try: 

1028 compile(script, "<string>", "exec") 

1029 except Exception as e: 

1030 msg = str(e) 

1031 case _: 

1032 raise UserError(_("Invalid execution mode for upgrade line '%s'") % self.name) 

1033 

1034 if msg: 

1035 raise ValidationError(_("Error validating script: %s") % msg) 

1036 

1037 for condition, pattern in forbidden_patterns: 

1038 if not condition(self): 

1039 continue 

1040 match = re.search(pattern, script, re.IGNORECASE) 

1041 if match: 

1042 operation = match.group(0) 

1043 raise UserError(_("Forbidden operation on the execution: '%s'") % operation) 

1044 

1045 def _extract_script_lineno(self, exc: BaseException) -> int | None: 

1046 """ 

1047 Walk the full exception chain (exc → __cause__ → __context__) and collect 

1048 all frames whose filename is '' or '<string>' (i.e. from safe_eval). 

1049 Returns the line number of the *innermost* such frame, or None if not found. 

1050 

1051 :param exc: The exception to analyze 

1052 :return: The line number of the innermost frame from safe_eval, or None if not found 

1053 """ 

1054 lineno = None 

1055 seen = set() 

1056 current: BaseException | None = exc 

1057 while current is not None and id(current) not in seen: 

1058 seen.add(id(current)) 

1059 for frame in traceback.extract_tb(current.__traceback__): 

1060 filename, mb_lineno, _, _ = frame 

1061 if filename in ("", "<string>"): 

1062 lineno = mb_lineno # keep updating -> gets the innermost frame 

1063 current = current.__cause__ or current.__context__ 

1064 return lineno 

1065 

1066 ############################################################################# 

1067 

1068 # Function to populate demo data 

1069 def set_last_script_version(self): 

1070 for rec in self: 

1071 rec.actual_script_id = self.env["saas.upgrade.line.script"].search( 

1072 [("upgrade_line_id", "=", rec.id)], limit=1 

1073 )