Coverage for ingadhoc-odoo-saas-adhoc / saas_provider_upgrade / wizards / adhoc_module_ready_to_upgrade.py: 17%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 19:24 +0000

1from typing import Any 

2 

3from odoo import _, fields, models 

4from odoo.exceptions import UserError 

5from odoo.models import BaseModel 

6 

7 

8class AdhocModuleReadyToUpgrade(models.TransientModel): 

9 _name = "adhoc.module.ready_to_upgrade.wizard" 

10 _description = "adhoc.module.ready_to_upgrade.wizard" 

11 

12 upgrade_type_id = fields.Many2one("saas.upgrade.type", string="Upgrade Type", required=True) 

13 from_version_id = fields.Many2one("saas.odoo.version.group", string="From Odoo Project", readonly=True) 

14 

15 def default_get(self, fields_list: list) -> dict: 

16 defaults = super().default_get(fields_list) 

17 databases = self.env["saas.database"].browse(self.env.context.get("active_ids")) 

18 odoo_projects = databases.odoo_version_group_id 

19 if len(odoo_projects) != 1: 

20 raise UserError(_("Databases must all be of the same version (same Odoo Project)")) 

21 defaults.update( 

22 { 

23 "from_version_id": odoo_projects.id, 

24 } 

25 ) 

26 return defaults 

27 

28 def _get_modules(self) -> BaseModel: 

29 actual_modules = ( 

30 self.env["saas.database"] 

31 .browse(self.env.context.get("active_ids")) 

32 .mapped("analytic_account_id.database_ids") 

33 .filtered("is_production") 

34 .mapped("module_ids") 

35 ) 

36 new_version_modules = actual_modules.search( 

37 [ 

38 ("name", "in", actual_modules.mapped("name")), 

39 ("major_version_id", "=", self.upgrade_type_id.target_odoo_version_group_id.major_version_id.id), 

40 ] 

41 ) 

42 return new_version_modules 

43 

44 def action_modules(self) -> dict: 

45 modules = self._get_modules() 

46 action = self.env["ir.actions.actions"]._for_xml_id("saas_provider_adhoc.action_adhoc_module_module") 

47 action.update( 

48 { 

49 "domain": [("id", "=", modules.ids)], 

50 } 

51 ) 

52 return action 

53 

54 def action_tasks(self) -> dict: 

55 self.ensure_one() 

56 modules = self._get_modules() 

57 action = self.env["ir.actions.actions"]._for_xml_id("project.dblc_proj") 

58 action.update( 

59 { 

60 "domain": [("id", "=", modules.mapped("task_ids").ids)], 

61 "context": {"project_id": modules.mapped("task_ids.project_id")[:1].id}, 

62 } 

63 ) 

64 return action 

65 

66 def check_database(self) -> Any: 

67 self.ensure_one() 

68 databases = self.env["saas.database"].browse(self.env.context.get("active_ids")) 

69 if len(databases) != 1: 

70 raise UserError(_("Database check can only be run on a single database")) 

71 database = databases 

72 if database.is_production: 

73 raise UserError("No puede correr este chequeo en bases productivas") 

74 ticket = self.env["helpdesk.ticket"].search( 

75 [("project_id", "in", database.project_ids.ids), ("upgrade_type_id", "=", self.upgrade_type_id.id)] 

76 ) 

77 if not ticket: 

78 ticket = self.env["helpdesk.ticket"].create( 

79 { 

80 "project_id": database.project_ids[0].id, 

81 "upgrade_type_id": self.upgrade_type_id.id, 

82 "name": "Dummy ticket to check database", 

83 "main_database_id": database.id, 

84 } 

85 ) 

86 request = self.env["helpdesk.ticket.upgrade.request"]._create_request(ticket.id) 

87 request.original_database_id = database.id 

88 results = [] 

89 for script in request.script_ids: 

90 try: 

91 result = script.run(request) 

92 if result: 

93 results.append(result) 

94 except Exception as e: 

95 results.append(str(e)) 

96 raise UserError(_("We found the following issues:\n%s") % "\n".join(results) if results else "No issues found.") 

97 

98 def create_upgrades(self) -> BaseModel | dict: 

99 databases = self.env["saas.database"].browse(self.env.context.get("active_ids")) 

100 tickets = self.env["helpdesk.ticket"] 

101 for db in databases: 

102 project = db.project_ids and db.project_ids[0] 

103 if not project: 

104 continue 

105 ticket = tickets.search( 

106 [ 

107 ("project_id", "=", project.id), 

108 ("upgrade_type_id", "=", self.upgrade_type_id.id), 

109 ("upgrade_team", "=", True), 

110 ], 

111 limit=1, 

112 ) 

113 if not ticket: 

114 upgrade_team = self.env["helpdesk.team"].search([("upgrade_team", "=", True)], limit=1) 

115 ticket = tickets.with_context(upgrade_ticket=True).create( 

116 { 

117 "name": "%s - %s" % (self.upgrade_type_id.name, project.name), 

118 "partner_id": db.active_subscription_id.partner_shipping_id.id, 

119 "project_id": project.id, 

120 "upgrade_type_id": self.upgrade_type_id.id, 

121 "adhoc_product_id": self.env.ref( 

122 "saas_provider_upgrade.actualizacion_de_version", raise_if_not_found=False 

123 ).id, 

124 "team_id": upgrade_team.id, 

125 "main_database_id": project.main_database_id.id if project.main_database_id else db.id, 

126 } 

127 ) 

128 tickets += ticket 

129 ticket.message_subscribe( 

130 partner_ids=( 

131 project.user_id.partner_id 

132 | db.portal_access_ids.filtered("manage_tickets_and_tasks").mapped("user_id.partner_id") 

133 ).ids 

134 ) 

135 if self.env.context.get("return_recordset"): 

136 return tickets 

137 return { 

138 "type": "ir.actions.act_window", 

139 "name": "Tickets de upgrade", 

140 "res_model": "helpdesk.ticket", 

141 "view_mode": "kanban,list,form", 

142 "domain": [("id", "in", tickets.ids)], 

143 }