Coverage for ingadhoc-odoo-saas-adhoc / saas_provider_upgrade / models / saas_upgrade_line.py: 42%
491 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
1##############################################################################
2# For copyright and license notices, see __manifest__.py file in module root
3# directory
4##############################################################################
5import logging
6import re
7import traceback
8from textwrap import dedent
9from typing import TYPE_CHECKING
11from markupsafe import Markup
12from odoo import _, api, fields, models
13from odoo.exceptions import RedirectWarning, UserError, ValidationError
14from odoo.fields import Domain
15from odoo.http import request
16from odoo.tools import SQL, is_html_empty, safe_eval
17from odoo.tools.misc import get_diff
18from odooly import Client
20from ..constants import MAP_REQUEST_STATES, UL_EXECUTION_MODES, UL_TYPE_DB_REQUIREMENTS
21from ..exceptions import UpgradeLineException
23SHELL_COMMAND_TEMPLATE = """
24request = env['helpdesk.ticket.upgrade.request'].browse({})
25upgrade_line = env['saas.upgrade.line'].browse({})
26eval_context = dict()
27with request.with_context(no_new_client={}, no_close_cursor=True)._get_eval_context(eval_context, upgrade_line=upgrade_line):
28 pass
29for key, value in eval_context.items():
30 globals()[key] = value
31"""
33if TYPE_CHECKING:
34 from saas_provider_upgrade.models.helpdesk_ticket_upgrade_request import (
35 HelpdeskTicketUpgradeRequest as UpgradeRequest,
36 )
37 from saas_provider_upgrade.models.saas_upgrade_line_request_run import (
38 SaasUpgradeLineRequestRun as UpgradeRequestRun,
39 )
40 from saas_provider_upgrade.models.saas_upgrade_line_script import (
41 SaasUpgradeLineScript as UpgradeLineScript,
42 )
44_logger = logging.getLogger(__name__)
47class SaasUpgradeLine(models.Model):
48 _name = "saas.upgrade.line"
49 _inherit = ["mail.thread", "mail.activity.mixin", "saas.provider.upgrade.util"]
50 _description = "Upgrade Lines"
51 _order = "type, sequence, name"
53 sequence = fields.Integer(
54 default=10,
55 help="Sequence can be used to run an upgrade line before another of the same type",
56 copy=False,
57 )
58 customer_note_sequence = fields.Integer(
59 default=10,
60 help="Customer notes are sorted by sequence and then by product. "
61 "By default, all will have the same sequence, but you can prioritize by changing it here",
62 copy=False,
63 )
64 name = fields.Char(
65 required=True,
66 )
67 type = fields.Selection(
68 [
69 ("0_on_create", "On Create"),
70 ("1_evaluation", "Evaluation"),
71 ("2_pre", "Pre Odoo"),
72 ("3_pre-adhoc", "Pre Adhoc"),
73 ("4_post", "Post Adhoc"),
74 ("5_test", "Test & C. Notes"),
75 ("6_after_done", "After Done"),
76 ],
77 help="The purpose and stage where the script will run depends on its type:\n"
78 "* 0- On Create: runs before creating the request, intended to enforce constraints when creating a request. "
79 "IMPORTANT: no client database is available yet.\n"
80 "* 1- Evaluation: runs when the request is in draft, used to evaluate if everything is ready for an upgrade. "
81 "Also runs when evaluating a database to check readiness. "
82 "IMPORTANT: These scripts should NOT make changes to databases.\n"
83 "* 2- Pre Odoo: used to make adjustments on the 'old' database before sending it to Odoo.\n"
84 "* 3- Pre Adhoc: runs after restoring the database provided by Odoo and before running our upgrade "
85 "(-u all). These scripts do not have 'new_client' to avoid waking the Odoo service until the next step. "
86 "'new_run_sql' or 'new_run_openupgradelib_method' is usually used.\n"
87 "* 4- Post Adhoc: runs after completing the upgrade of all modules, used for final database adjustments. "
88 "After these scripts, the database is activated and missing modules are auto-installed (according to adhoc module classification).\n"
89 "* 5- Test & C. Notes: used for automated tests and adding additional customer notes. "
90 "IMPORTANT: These scripts should NOT make changes to databases.\n"
91 "* 6- After Done: runs if the upgrade completes successfully.\n",
92 required=True,
93 tracking=True,
94 change_default=True,
95 )
96 customer_note_validation_script = fields.Text(
97 copy=False,
98 )
99 upgrade_ticket_ids = fields.Many2many(
100 "helpdesk.ticket",
101 domain="[('upgrade_team', '=', True)]",
102 help="Used as custom upgrade on these upgrade tickets",
103 )
104 upgrade_type_ids = fields.Many2many(
105 "saas.upgrade.type",
106 "saas_upgrade_scripts_type_rel",
107 "upgrade_line_id",
108 "type_id",
109 )
110 old_filter_domain = fields.Char(
111 tracking=True,
112 help="If you want to evaluate not only that the module is installed, but that records exist in a model, "
113 "add a dictionary where the key is the model and the value is the domain. "
114 "If multiple elements are added, the warning will appear if there are records for at least one evaluation.",
115 )
116 new_filter_domain = fields.Char(
117 tracking=True,
118 help="If you want to evaluate not only that the module is installed, but that records exist in a model, "
119 "add a dictionary where the key is the model and the value is the domain. "
120 "If multiple elements are added, the warning will appear if there are records for at least one evaluation.",
121 )
122 module_ids = fields.Many2many(
123 "adhoc.module",
124 "adhoc_product_upgrade_line_rel",
125 "upgrade_line_id",
126 "module_id",
127 string="Modules",
128 help="If modules are defined, the action runs only if all modules are installed in the old database.",
129 )
130 from_major_version_ids = fields.Many2many(
131 "saas.odoo.version.group",
132 string="From Odoo Projects",
133 tracking=True,
134 help="This script will only run for upgrades coming from these versions",
135 )
136 title = fields.Char(
137 copy=False,
138 )
139 message = fields.Html(
140 copy=False,
141 prefetch=True,
142 sanitize=False,
143 help="Message displayed to the client in the Customer Note",
144 )
145 is_message_empty = fields.Boolean(
146 compute="_compute_messages_empty",
147 help="Computed since message is HTML; used to determine if empty for some view conditions.",
148 )
149 dev_message = fields.Html(
150 copy=False,
151 prefetch=True,
152 sanitize=False,
153 help="Used to test the Customer Note message. Must be approved to apply.",
154 )
155 is_dev_message_empty = fields.Boolean(
156 compute="_compute_messages_empty",
157 help="Computed since dev_message is HTML; used to determine if empty for some view conditions.",
158 )
159 adhoc_product_id = fields.Many2one(
160 "adhoc.product",
161 domain=[("parent_id", "!=", False), ("type", "=", "horizontal")],
162 required=True,
163 change_default=True,
164 )
165 technical_team_id = fields.Many2one(related="adhoc_product_id.technical_team_id")
166 product_owner_id = fields.Many2one(related="adhoc_product_id.product_owner_id")
167 add_customer_note = fields.Boolean()
168 customer_note_validation = fields.Boolean(copy=False)
169 customer_note_upload_changes = fields.Boolean(copy=False)
170 force_reopen_customer_note = fields.Boolean(
171 help="If set, customer notes generated from this upgrade line will be reopened if the content changes.",
172 )
173 active = fields.Boolean(default=True, tracking=True)
174 description = fields.Html(copy=False)
175 description_customer_note_validation = fields.Html(copy=False)
176 request_id = fields.Many2one(
177 "helpdesk.ticket.upgrade.request",
178 compute="_compute_request_and_request_run",
179 )
180 customer_note_ids = fields.One2many("helpdesk.ticket.customer_note", "upgrade_line_id")
181 upgrade_line_request_run_ids = fields.One2many(
182 "saas.upgrade.line.request.run",
183 "upgrade_line_id",
184 )
185 run_info_id = fields.Many2one(
186 "saas.upgrade.line.request.run",
187 compute="_compute_request_and_request_run",
188 help="Upgrade Line state for this request.\n"
189 " - Done (duration): script finished, shows duration.\n"
190 " - Running: script is currently running.\n"
191 " - N/A: script does not apply to request databases.\n"
192 " If script hasn't run yet or does not exist, remains empty.",
193 )
194 state = fields.Selection(
195 [
196 ("draft", "Draft"),
197 ("to_correct", "To Correct"),
198 ("approved", "Approved"),
199 ],
200 default="draft",
201 tracking=True,
202 copy=False,
203 )
204 priority = fields.Selection(
205 [
206 ("0", "Low Priority"),
207 ("1", "High Priority"),
208 ],
209 default="0",
210 tracking=True,
211 copy=False,
212 )
213 helpdesk_ticket_ids = fields.Many2many(
214 "helpdesk.ticket",
215 "saas_upgrade_line_ticket_rel",
216 "upgrade_line_id",
217 "ticket_id",
218 string="Tickets",
219 help="Tickets associated with the Upgrade Line.",
220 )
221 project_task_ids = fields.Many2many(
222 "project.task",
223 "saas_upgrade_line_task_rel",
224 "upgrade_line_id",
225 "task_id",
226 string="Tasks",
227 help="Tasks associated with the Upgrade Line.",
228 )
229 allow_bypass = fields.Boolean(
230 "Allow bypass",
231 default="True",
232 help="If checked, the script is skipped when creating the request",
233 )
234 upgrade_line_request_log_ids = fields.One2many(
235 "saas.upgrade.line.request.log",
236 "upgrade_line_id",
237 )
238 filter_by_version = fields.Selection(
239 [
240 ("all", "All versions"),
241 ("selected", "Specific versions"),
242 ],
243 string="Apply from",
244 compute="_compute_filter_by_version",
245 inverse="_inverse_filter_by_version",
246 )
247 allowed_from_major_version_ids = fields.Many2many(
248 "saas.odoo.version.group",
249 compute="_compute_allowed_from_major_version_ids",
250 )
251 run_type = fields.Selection(
252 [("production", "Production"), ("test", "Test")],
253 string="Run in",
254 help="If empty, it will run in both Test and Production. Otherwise, it will run only in the selected environment.",
255 )
256 actual_script_id = fields.Many2one(
257 "saas.upgrade.line.script",
258 copy=False,
259 )
260 upgrade_line_script_version_ids = fields.One2many(
261 "saas.upgrade.line.script",
262 "upgrade_line_id",
263 )
264 script_version = fields.Integer(
265 related="actual_script_id.version",
266 help="Current script version",
267 tracking=True,
268 )
269 script = fields.Text(
270 related="actual_script_id.script",
271 store=True,
272 copy=False,
273 )
274 dev_script = fields.Text(
275 copy=False,
276 )
277 script_diff = fields.Html(
278 sanitize_tags=False,
279 compute="_compute_script_diff",
280 )
281 my_dependencies_ids = fields.Many2many(
282 "saas.upgrade.line",
283 "saas_upgrade_line_dependencies_rel",
284 "line_id",
285 "related_line_id",
286 domain="[('type', '=', type)]",
287 copy=False,
288 )
289 depends_on_me_ids = fields.Many2many(
290 "saas.upgrade.line",
291 "saas_upgrade_line_dependencies_rel",
292 "related_line_id",
293 "line_id",
294 domain="[('type', '=', type)]",
295 copy=False,
296 )
297 ul_avg_duration = fields.Float(
298 compute="_compute_ul_avg_duration",
299 )
300 computed_help_info = fields.Html(
301 string="Help Information",
302 compute="_compute_computed_help_info",
303 store=False,
304 readonly=True,
305 )
306 placeholders = fields.Json(
307 compute="_compute_request_and_request_run",
308 )
309 shell_command = fields.Text(
310 compute="_compute_shell_command",
311 help="Shell command to access the upgraded database environment via Odoo shell.",
312 )
313 module_check_logic = fields.Selection(
314 [("all", "All"), ("any", "Any")],
315 default="all",
316 required=True,
317 help="Determines module validation logic:\n"
318 "- 'All': All listed modules must be installed in the old database.\n"
319 "- 'Any': At least one listed module must be installed in the old database.",
320 )
321 execution_mode = fields.Selection(
322 related="actual_script_id.execution_mode",
323 depends=["actual_script_id", "actual_script_id.execution_mode"],
324 store=True,
325 help="Execution mode:\n"
326 "- 'Odooly': Uses the Odooly client. Better for simple scripts.\n"
327 "- 'Odoo Shell': Runs in an Odoo shell. Better for complex scripts.",
328 )
329 dev_execution_mode = fields.Selection(
330 UL_EXECUTION_MODES,
331 required=True,
332 default="odooly",
333 help="Execution mode to use when testing and approving the dev script.",
334 )
336 def _compute_computed_help_info(self):
337 config_param = self.env["ir.config_parameter"].sudo()
338 help_content = config_param.get_param("res.config.settings.code_help_html", default="")
339 for record in self:
340 record.computed_help_info = help_content
342 def _compute_ul_avg_duration(self):
343 """Compute the average duration of runs for each upgrade line on state Done."""
344 if not self.ids:
345 self.ul_avg_duration = 0.0
346 return
347 self.env["saas.upgrade.line.request.run"].flush_model()
348 results = self.env.execute_query_dict(
349 SQL(
350 """
351 SELECT upgrade_line_id, AVG(duration) as avg
352 FROM saas_upgrade_line_request_run
353 WHERE upgrade_line_id IN %(ids)s AND state = 'done'
354 GROUP BY upgrade_line_id
355 """,
356 ids=tuple(self.ids),
357 )
358 )
359 avg_map = {r["upgrade_line_id"]: r["avg"] or 0.0 for r in results}
360 for rec in self:
361 rec.ul_avg_duration = avg_map.get(rec.id, 0.0)
363 @api.depends_context("ticket_request_id")
364 def _compute_request_and_request_run(self):
365 upgrade_request_obj = self.env["helpdesk.ticket.upgrade.request"]
366 ticket_request_id = self.env.context.get("ticket_request_id", False)
367 for rec in self:
368 if ticket_request_id: 368 ↛ 375line 368 didn't jump to line 375 because the condition on line 368 was always true
369 rec.request_id = upgrade_request_obj.browse(ticket_request_id)
370 rec.run_info_id = rec.request_id.upgrade_line_request_run_ids.filtered(
371 lambda x: x.upgrade_line_id == rec
372 )
373 rec.placeholders = rec.run_info_id.placeholders
374 else:
375 rec.request_id = upgrade_request_obj
376 rec.run_info_id = False
377 rec.placeholders = False
379 @api.depends_context("request_id")
380 def _compute_shell_command(self):
381 for rec in self:
382 ticket_request_id = rec.request_id
383 if ticket_request_id:
384 no_new_client = not ticket_request_id.with_upgraded_database
385 rec.shell_command = dedent(
386 SHELL_COMMAND_TEMPLATE.format(ticket_request_id.id, rec.id, no_new_client)
387 ).strip()
388 else:
389 rec.shell_command = ""
391 @api.depends("script", "dev_script")
392 def _compute_script_diff(self):
393 for rec in self:
394 orig_script = [rec.script or "", "Old Script"]
395 new_script = [rec.dev_script or "", "New Script"]
396 rec.script_diff = get_diff(
397 orig_script,
398 new_script,
399 custom_style=False,
400 dark_color_scheme=request and request.httprequest.cookies.get("color_scheme") == "dark",
401 )
403 @api.depends("upgrade_type_ids")
404 def _compute_allowed_from_major_version_ids(self):
405 all_versions = self.env["saas.odoo.version.group"].search([]).ids
406 for rec in self:
407 if rec.upgrade_type_ids:
408 rec.allowed_from_major_version_ids = rec.mapped("upgrade_type_ids.from_major_version_ids")
409 else:
410 rec.allowed_from_major_version_ids = all_versions
412 @api.depends("from_major_version_ids")
413 def _compute_filter_by_version(self):
414 for rec in self:
415 if rec.from_major_version_ids:
416 rec.filter_by_version = "selected"
417 else:
418 rec.filter_by_version = "all"
420 @api.depends("message", "dev_message")
421 def _compute_messages_empty(self):
422 for rec in self:
423 rec.is_message_empty = not rec.message or is_html_empty(rec.message)
424 rec.is_dev_message_empty = not rec.dev_message or is_html_empty(rec.dev_message)
426 @api.depends("name", "script_version")
427 def _compute_display_name(self):
428 for rec in self:
429 if rec.script_version and rec.script_version > 0:
430 rec.display_name = f"(V-{rec.script_version}) {rec.name}"
431 else:
432 rec.display_name = rec.name
434 @api.depends("filter_by_version")
435 def _inverse_filter_by_version(self):
436 for rec in self.filtered(lambda x: x.filter_by_version == "all"):
437 rec.from_major_version_ids = False
439 @api.onchange("module_ids")
440 def module_version_id_adhoc_product(self):
441 for rec in self:
442 if not rec.adhoc_product_id and rec.module_ids:
443 rec.adhoc_product_id = rec.module_ids[0].adhoc_product_id
445 @api.onchange("upgrade_type_ids")
446 def _onchange_upgrade_type_ids(self):
447 """Clean from_major_version_ids removing:
448 1. Target versions of the selected upgrade types
449 2. Versions that are not in the allowed from_major_version_ids of any selected upgrade type
450 """
451 for rec in self.filtered(lambda r: r.upgrade_type_ids and r.from_major_version_ids):
452 rec.from_major_version_ids = rec.from_major_version_ids & rec.allowed_from_major_version_ids
454 @api.constrains("sequence", "my_dependencies_ids")
455 def _check_sequence_by_dependencies(self):
456 message = ""
457 for rec in self.my_dependencies_ids: 457 ↛ 458line 457 didn't jump to line 458 because the loop on line 457 never started
458 if self.sequence <= rec.sequence:
459 message += f"{rec.name}: Sequence {rec.sequence}\n"
460 if message: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 raise ValidationError(
462 f"The sequence of this UL cannot be less than or equal to the sequences of its dependencies: \n {message}"
463 )
465 @api.constrains("sequence", "depends_on_me_ids")
466 def _check_sequence_by_reverse_dependencies(self):
467 message = ""
468 for rec in self.depends_on_me_ids: 468 ↛ 469line 468 didn't jump to line 469 because the loop on line 468 never started
469 if self.sequence >= rec.sequence:
470 message += f"{rec.name}: Sequence {rec.sequence}\n"
471 if message: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true
472 raise ValidationError(
473 f"The sequence of this UL cannot be greater than or equal to the sequences of ULs that depend on it: \n {message}"
474 )
476 @api.constrains("new_filter_domain", "old_filter_domain")
477 def check_filter_domain_value(self):
478 for rec in self:
479 if rec.old_filter_domain:
480 try:
481 safe_eval.safe_eval(rec.old_filter_domain, rec._get_filters_eval_context())
482 except Exception as e:
483 raise ValidationError(_("Error in 'Old Filter Domain' for script '%s':\n%s") % (rec.name, str(e)))
484 if rec.new_filter_domain:
485 try:
486 safe_eval.safe_eval(rec.new_filter_domain, rec._get_filters_eval_context())
487 except Exception as e:
488 raise ValidationError(_("Error in 'New Filter Domain' for script '%s':\n%s") % (rec.name, str(e)))
490 @api.constrains("upgrade_ticket_ids", "upgrade_type_ids")
491 def _check_upgrade_line_usage(self):
492 for rec in self:
493 if not rec.upgrade_ticket_ids and not rec.upgrade_type_ids: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true
494 raise ValidationError(_("An upgrade line must be used in either tickets or upgrade types"))
496 def copy_data(self, default=None):
497 vals_list = super().copy_data(default=default)
498 for val in vals_list:
499 val.update(
500 {
501 "name": "%s (copy)" % self.name,
502 "title": "%s (copy)" % self.title if self.add_customer_note else False,
503 "dev_script": self.script,
504 "dev_message": self.message if self.add_customer_note else False,
505 }
506 )
507 return vals_list
509 def write(self, vals):
510 res = super().write(vals)
511 for rec in self:
512 # Delete associated customer notes if the upgrade_type_ids are changed
513 if "upgrade_type_ids" in vals:
514 if rec.upgrade_type_ids: 514 ↛ 523line 514 didn't jump to line 523 because the condition on line 514 was always true
515 allowed_versions = set(rec.upgrade_type_ids.mapped("target_odoo_version_group_id"))
516 notes_to_unlink = rec.customer_note_ids.filtered(
517 lambda c: c.ticket_id.upgrade_type_id.target_odoo_version_group_id not in allowed_versions
518 )
519 if notes_to_unlink: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true
520 notes_to_unlink.unlink()
521 else:
522 # If upgrade_type_ids is cleared, unlink all related customer notes
523 rec.customer_note_ids.unlink()
525 # Archive or unarchive customer notes based on add_customer_note field
526 if "add_customer_note" in vals: 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true
527 if not rec.add_customer_note:
528 rec.customer_note_ids.action_archive()
529 else:
530 rec.customer_note_ids.action_unarchive()
531 return res
533 def action_archive(self):
534 for rec in self:
535 # delete associated customer notes if ULs are archived
536 rec.with_context(active_test=False).sudo().customer_note_ids.unlink()
537 rec.state = "draft"
538 super().action_archive()
540 def action_get_script_versions(self) -> dict:
541 self.ensure_one()
542 domain = [("upgrade_line_id", "=", self.id)]
543 action = self.env["ir.actions.act_window"]._for_xml_id("saas_provider_upgrade.action_saas_upgrade_line_script")
544 action["domain"] = domain
545 return action
547 def action_get_running_status(self) -> dict:
548 self.ensure_one()
549 action = self.env["ir.actions.act_window"]._for_xml_id(
550 "saas_provider_upgrade.action_saas_upgrade_line_request_run"
551 )
552 action["domain"] = [
553 ["upgrade_line_id", "=", self.id],
554 ["request_id", "!=", False],
555 ]
556 return action
558 def action_check_filter_domain(self) -> dict | bool:
559 self.ensure_one()
560 field_name = self.env.context.get("field_type", False)
561 if not field_name or not self.request_id:
562 return False
564 if field_name == "old_filter_domain":
565 database = self.request_id.original_database_id
566 elif field_name == "new_filter_domain":
567 database = self.request_id.upgraded_database_id
568 else:
569 return False
571 db_client = database.get_client_cluster()
572 self._check_filter_domain(db_client, getattr(self, field_name))
574 title = self.env._("Filter tested successfully")
575 message = self.env._("The filter has been tested and its result was successful")
576 return self.build_display_notification_action(title, message)
578 def action_get_requests(self) -> dict:
579 self.ensure_one()
580 domain = Domain("aim", "=", "test") & Domain("automatic", "=", False)
581 db_requirement = UL_TYPE_DB_REQUIREMENTS.get(self.type, None)
582 if db_requirement == "with_original_db":
583 domain &= Domain("with_original_database", "=", True)
584 if db_requirement == "with_upgraded_db":
585 domain &= Domain("with_upgraded_database", "=", True)
587 if self.upgrade_type_ids:
588 domain &= Domain("upgrade_type_id", "in", self.upgrade_type_ids.ids)
589 if self.upgrade_ticket_ids:
590 domain &= Domain("ticket_id", "in", self.upgrade_ticket_ids.ids)
592 requests = self.env["helpdesk.ticket.upgrade.request"].search(domain)
593 if self.module_ids:
594 script_modules = set(self.module_ids.mapped("name"))
595 match self.module_check_logic:
596 case "all":
597 requests = requests.filtered(
598 lambda x: script_modules.issubset(set(x.ticket_id.main_database_id.module_ids.mapped("name")))
599 )
600 case "any":
601 requests = requests.filtered(
602 lambda x: bool(
603 script_modules.intersection(set(x.ticket_id.main_database_id.module_ids.mapped("name")))
604 )
605 )
606 action = self.env["ir.actions.act_window"]._for_xml_id("saas_provider_upgrade.action_ticket_upgrade_request")
607 action["context"] = {"by_upgrade_line": True, "upgrade_line_id": self.id}
608 action["domain"] = [("id", "in", requests.ids)]
609 return action
611 def action_approve_script_on_background(self) -> dict:
612 self.ensure_one()
613 if is_html_empty(self.description):
614 raise UserError(_("The description must be complete to approve the script!"))
615 if self.old_filter_domain:
616 self.with_context(test_dev=True, field_type="old_filter_domain").action_check_filter_domain()
617 if self.new_filter_domain:
618 self.with_context(test_dev=True, field_type="new_filter_domain").action_check_filter_domain()
619 return self.with_context(approve=True, test_dev=True)._prepare_run()
621 def action_approve_script(self):
622 """
623 Action called to approve the dev_script and set it as the actual_script.
624 """
625 self.ensure_one()
626 self._validate_ops()
627 script = self.dev_script
628 if not script: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true
629 raise UserError(_("The script cannot be empty when approving!"))
631 new_script = self.env["saas.upgrade.line.script"].create(
632 {
633 "script": script,
634 "version": self.script_version + 1,
635 "upgrade_line_id": self.id,
636 "execution_mode": self.dev_execution_mode,
637 }
638 )
639 self.write(
640 {
641 "actual_script_id": new_script.id,
642 "dev_script": False,
643 "state": "approved",
644 }
645 )
647 def action_approve_message(self):
648 self.write(
649 {
650 "message": self.dev_message,
651 "dev_message": "",
652 "state": "approved",
653 }
654 )
655 self.message_post(body=self.env._("Customer Note message approved"))
657 def action_copy_script(self):
658 self.ensure_one()
659 self.dev_script = self.script
660 self.dev_execution_mode = self.execution_mode
662 def action_copy_message(self):
663 self.ensure_one()
664 self.dev_message = self.message
666 def action_to_correct(self):
667 """
668 Action to mark upgrade line as 'To Correct'
669 """
670 self.ensure_one()
671 self.state = "to_correct"
673 partner_ids = []
674 if self.adhoc_product_id.product_owner_id.user_id:
675 partner_ids.append(self.adhoc_product_id.product_owner_id.user_id.partner_id.id)
676 if self.adhoc_product_id.technical_expert_id.user_id:
677 partner_ids.append(self.adhoc_product_id.technical_expert_id.user_id.partner_id.id)
679 if partner_ids:
680 self.message_post(
681 body="This script need fixes, please check and approve again if pass the test.",
682 partner_ids=partner_ids,
683 subtype_xmlid="mail.mt_note",
684 message_type="notification",
685 )
687 def action_set_draft(self):
688 """
689 Action to set upgrade line back to 'Draft'
690 """
691 self.ensure_one()
692 self.state = "draft"
694 def action_approve(self):
695 """
696 Action to approve upgrade line directly without testing
697 """
698 self.ensure_one()
699 self.state = "approved"
701 def action_deactivate_script(self):
702 self.ensure_one()
703 if not self.request_id:
704 raise UserError(_("Cannot deactivate a script without a request!"))
705 if self.request_id.automatic or self.request_id.status == "running":
706 raise UserError(_("Cannot deactivate a script in a running request!"))
707 self.request_id.deactivated_script_ids = [(4, self.id)]
709 def action_prepare_run(self) -> dict | None:
710 """
711 Action that queues the script to run in the background.
713 1. runs the script or
714 2. runs the dev_script
716 this actions are done depending on the context.
717 """
718 self.ensure_one()
719 if not self.request_id:
720 raise RedirectWarning(
721 message=_("A request is needed to run the script!"),
722 action=self.action_get_requests(),
723 button_text=_("See Requests"),
724 )
725 if self.request_id.automatic:
726 raise UserError(self.env._("Cannot run the script on an automatic request!"))
728 self._validate_ops()
729 return self._prepare_run()
731 def run(self, request: "UpgradeRequest") -> Markup | None:
732 """
733 In this method we made some things before executing "rec":
734 * Check if there is an original database in the request
735 * Check if there is an upgraded database in the request
736 * Filter the scripts that have already run
738 :param request: The request where the 'records' will run
739 :return: A message to send to the user or None if no message is needed.
740 """
741 self.ensure_one()
742 test_dev = self.env.context.get("test_dev", False)
743 original_db = request.original_database_id
744 upgraded_db = request.upgraded_database_id
745 map_req_state = MAP_REQUEST_STATES.get(request.state, False)
747 if map_req_state == "before_odoo" and not original_db: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true
748 raise UserError(self.env._("There is no original database!"))
749 elif map_req_state == "after_odoo" and not upgraded_db: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true
750 raise UserError(self.env._("There is no upgraded database!"))
752 # If it is run with a button or with 'test_dev' we always run it
753 # If not, we filter the scripts that have already run
754 self = self.with_context(ticket_request_id=request.id)
755 run_info = self.run_info_id or self.env["saas.upgrade.line.request.run"]
756 if not test_dev and run_info.state == "done": 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true
757 return None
759 request.upgrade_line_request_log_entry_ids.filtered(
760 lambda e: e.upgrade_line_id == self and e.type == "warning"
761 ).write({"solved": True}) # Mark previous warnings as solved
763 # Manage freezed scripts
764 script_version = self._get_script_version()
765 script = script_version.script
766 execution_mode = script_version.execution_mode
767 run_info.start(script_version)
768 match execution_mode:
769 case "odooly": 769 ↛ 771line 769 didn't jump to line 771 because the pattern on line 769 always matched
770 return self._run_odooly(request, run_info, script)
771 case "odoo_shell":
772 return self._run_odoo_shell(request, run_info, script)
773 case _:
774 _logger.error("Invalid execution mode '%s' for upgrade line '%s'", execution_mode, self.name)
775 raise UserError(_("Invalid execution mode for upgrade line '%s'") % self.name)
777 def _run_odooly(self, request: "UpgradeRequest", run_info: "UpgradeRequestRun", script: str) -> Markup | None:
778 """
779 Runs the script using odooly connections to the client clusters.
781 :param request: The request where the script will run
782 :param run_info: The run info record
783 :return: The result of the script execution
784 """
785 self.ensure_one()
786 test_dev = self.env.context.get("test_dev", False)
787 eval_context: dict = {}
788 if not self.type == "0_on_create":
789 self.env["ir.cron"]._commit_progress() # Commit to reflect the run_info state
791 with request._get_eval_context(eval_context, upgrade_line=self):
792 try:
793 safe_eval.safe_eval(script, eval_context, mode="exec")
794 except Exception as e:
795 self.env.cr.rollback()
796 lineno = self._extract_script_lineno(e)
797 error = str(e)
798 raise UpgradeLineException(self.env, request, self, error, lineno)
800 result = eval_context.pop("result", None)
801 breaks = eval_context.pop("breaks", None)
802 if breaks and not test_dev:
803 raise UserError(breaks)
805 self._process_customer_note(eval_context, request, run_info)
806 return run_info.finish(result=breaks or result) # 'breaks' has priority over 'result' if it's defined
808 def _run_odoo_shell(self, request: "UpgradeRequest", run_info: "UpgradeRequestRun", script: str):
809 """
810 Runs the script as a k8s job in the client cluster.
812 :param request: The request where the script will run
813 :param run_info: The run info record
814 """
815 self.ensure_one()
816 original_db = request.original_database_id
817 upgraded_db = request.upgraded_database_id
818 db = upgraded_db if UL_TYPE_DB_REQUIREMENTS.get(self.type, None) == "with_upgraded_db" else original_db
819 if not db:
820 raise UserError(self.env._("There is no database to run the job!"))
822 db._schedule_task(
823 "scriptupgrade", run_info_id=run_info.id, script=script, state_on_success=db.state, **self.env.context
824 )
826 def _applies(self, old_client: Client, new_client: Client) -> bool:
827 """
828 Checks if the script applies to the request (only if script type is not "after_done")
829 We use _check_filter_domain to evaluate the old and new filter domains.
830 :param old_client: Client connected to the original database (resolved externally to avoid repeated calls)
831 :param new_client: Client connected to the upgraded database (resolved externally to avoid repeated calls)
832 :return: True if the script applies, False otherwise.
833 """
834 self.ensure_one()
835 # Always run the "after_done" scripts
836 if self.type == "6_after_done" or not old_client:
837 return True
839 # We check old and new conditions if they are defined
840 db_requirement = UL_TYPE_DB_REQUIREMENTS.get(self.type, None)
841 checks = [(old_client, self.old_filter_domain)]
842 if db_requirement == "with_upgraded_db":
843 checks.append((new_client, self.new_filter_domain))
845 for client, filter_domain in checks:
846 if not filter_domain:
847 continue
848 try:
849 if not self._check_filter_domain(client, filter_domain):
850 return False
851 except Exception as e:
852 raise UpgradeLineException(self.env, self.request_id, self, str(e))
853 return True
855 def _prepare_run(self) -> dict | None:
856 """
857 Method that creates a 'saas.upgrade.line.request.run' record on 'pending' state
858 and triggers the cron to process it
859 """
860 for rec in self:
861 rec._create_run_info_id()
862 res = rec.run_info_id.enqueue()
863 return res
865 def _get_script_version(self) -> "UpgradeLineScript":
866 """
867 Function used to get the script to use
869 Condition: Active production freeze is True, we are in a production request and
870 UL type is not in UL_TYPES_FREEZE_IGNORES
872 1. If we are testing, return a temporary record with the dev_script and dev_execution_mode
873 2. If the UL does not have an script, and generates a customer note, we return a temporary record
874 with an empty script and 'odooly' as execution mode.
875 3. If 'condition' is True, we return the version to use, that is located in 'upgrade_lines_version_to_use' map
876 4. If 'condition' is False, we return the last version of the script
878 :return: The script register to execute
879 """
880 UpgradeLineScript = self.env["saas.upgrade.line.script"]
881 if self.env.context.get("test_dev", False):
882 return UpgradeLineScript.new(
883 {
884 "script": self.dev_script,
885 "execution_mode": self.dev_execution_mode,
886 "upgrade_line_id": self.id,
887 }
888 )
890 actual_script = self.actual_script_id
891 if not actual_script and self.add_customer_note:
892 return UpgradeLineScript.new(
893 {
894 "script": "",
895 "execution_mode": "odooly",
896 "upgrade_line_id": self.id,
897 }
898 )
900 if self.run_type == "production":
901 return actual_script
903 # The request is only required at this point
904 request = self.request_id
905 if not request: 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true
906 raise UserError(_("Cannot determine script version without a request!"))
908 ul_type = self.type
909 condition = (
910 request.active_production_freeze
911 and request.aim == "production"
912 and not self._production_freeze_ignore(ul_type)
913 )
914 match condition:
915 case True:
916 # 'upgrade_line_versions_to_use' is a mapping in the request that has
917 # the versions to use for each UL, with the
918 # following structure:
919 #
920 # {
921 # "<ul_id>": <version>,
922 # ...
923 # }
924 upgrade_line_versions_to_use = request._get_upgrade_line_versions_to_use()
925 version_to_use = upgrade_line_versions_to_use.get(str(self.id), -1)
926 script_version = self.upgrade_line_script_version_ids.filtered(lambda x: x.version == version_to_use)
927 if not script_version:
928 raise UserError(
929 self.env._("There must be a version for UL %s in the request %s!") % (self.id, request.id)
930 )
931 case False: 931 ↛ 934line 931 didn't jump to line 934 because the pattern on line 931 always matched
932 script_version = actual_script
934 return script_version
936 def _create_run_info_id(self):
937 """
938 Used to create a running status associated with the UL and the request
939 """
940 if not self.request_id:
941 raise UserError(_("Cannot create run info without a request!"))
943 if self.run_info_id:
944 return
945 self.run_info_id = self.env["saas.upgrade.line.request.run"].create(
946 {
947 "upgrade_line_id": self.id,
948 "request_id": self.request_id and self.request_id.id or False,
949 }
950 )
952 def _process_customer_note(self, context: dict, request: "UpgradeRequest", run_info: "UpgradeRequestRun"):
953 """
954 Method to process the customer note after running the script.
955 In test mode, it only prepares the placeholders.
956 Otherwise, it generates the customer note.
958 :param context: Context with variables from the script execution
959 :param request: The upgrade request
960 :param run_info: The run info record
961 """
962 test_dev = self.env.context.get("test_dev", False)
963 message = self.dev_message if test_dev else self.message
964 if request.aim == "test" and self.add_customer_note and message:
965 used_vars = self.extract_qweb_variables(message)
966 placeholders = (
967 {k: v for k, v in context.items() if (bool(v) and self.is_serializable(v) and k in used_vars)}
968 if used_vars
969 else {}
970 )
971 if test_dev:
972 run_info.write({"placeholders": placeholders})
973 else:
974 self.env["helpdesk.ticket.customer_note"].generate(self, request, placeholders)
976 def _check_filter_domain(self, client: Client, domain_to_check: str) -> bool:
977 """
978 Used to check if the filter domain returns any record in the database.
980 filter_domains = {
981 "res.partner": "[('customer', '=', True)]",
982 ...
983 }
985 :param client: Client connected to the database to check
986 :param domain_to_check: Domain to check
987 :return: True if the domain returns any record, False otherwise
988 """
989 filter_domain_ok = False
990 try:
991 eval = safe_eval.safe_eval(domain_to_check, self._get_filters_eval_context())
992 for model, domain in eval.items():
993 if client.env[model].search_count(domain):
994 filter_domain_ok = True
995 break
996 except Exception as e:
997 if not self.env.context.get("no_raise_on_filter_error"):
998 raise UserError(
999 self.env._("An error was logged with a filter domain in the script '%s'.\n Error message:\n %s")
1000 % (self.name, e)
1001 )
1002 _logger.warning("Filter domain validation failed for script '%s': %s", self.name, e)
1003 return False
1004 return filter_domain_ok
1006 def _validate_ops(self):
1007 """
1008 Validate that the script does not contain forbidden operations.
1009 This is to prevent malicious code execution or unintended operations.
1010 """
1011 types = [x[0] for x in self._fields["type"].selection]
1012 forbidden_patterns = [
1013 (lambda _: True, r"^(?!\s*#)(.*\b(new|old_database\.env)\b.*)$"),
1014 (lambda _: True, r"^(?!\s*#)(.*request\.env.*)$"),
1015 (lambda r: types.index(r.type) <= types.index("3_pre-adhoc"), r"^(?!\s*#)(.*new_client.*)$"),
1016 ]
1018 script_version = self._get_script_version()
1019 script = script_version.script
1020 execution_mode = script_version.execution_mode
1021 msg = None
1022 match execution_mode:
1023 case "odooly":
1024 msg = safe_eval.test_python_expr(expr=script, mode="exec")
1025 case "odoo_shell":
1026 # Use compile in 'odoo_shell' to avoid import errors
1027 try:
1028 compile(script, "<string>", "exec")
1029 except Exception as e:
1030 msg = str(e)
1031 case _:
1032 raise UserError(_("Invalid execution mode for upgrade line '%s'") % self.name)
1034 if msg:
1035 raise ValidationError(_("Error validating script: %s") % msg)
1037 for condition, pattern in forbidden_patterns:
1038 if not condition(self):
1039 continue
1040 match = re.search(pattern, script, re.IGNORECASE)
1041 if match:
1042 operation = match.group(0)
1043 raise UserError(_("Forbidden operation on the execution: '%s'") % operation)
1045 def _extract_script_lineno(self, exc: BaseException) -> int | None:
1046 """
1047 Walk the full exception chain (exc → __cause__ → __context__) and collect
1048 all frames whose filename is '' or '<string>' (i.e. from safe_eval).
1049 Returns the line number of the *innermost* such frame, or None if not found.
1051 :param exc: The exception to analyze
1052 :return: The line number of the innermost frame from safe_eval, or None if not found
1053 """
1054 lineno = None
1055 seen = set()
1056 current: BaseException | None = exc
1057 while current is not None and id(current) not in seen:
1058 seen.add(id(current))
1059 for frame in traceback.extract_tb(current.__traceback__):
1060 filename, mb_lineno, _, _ = frame
1061 if filename in ("", "<string>"):
1062 lineno = mb_lineno # keep updating -> gets the innermost frame
1063 current = current.__cause__ or current.__context__
1064 return lineno
1066 #############################################################################
1068 # Function to populate demo data
1069 def set_last_script_version(self):
1070 for rec in self:
1071 rec.actual_script_id = self.env["saas.upgrade.line.script"].search(
1072 [("upgrade_line_id", "=", rec.id)], limit=1
1073 )