Coverage for ingadhoc-odoo-saas-adhoc / saas_provider_upgrade / models / saas_upgrade_line_request_log.py: 30%
106 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 20:33 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 20:33 +0000
1import datetime
2import logging
3import re
4from typing import TYPE_CHECKING, Literal
6from odoo import _, api, fields, models
8if TYPE_CHECKING:
9 from ..models.helpdesk_ticket_upgrade_request import HelpdeskTicketUpgradeRequest as UpgradeRequest
10 from ..models.saas_upgrade_line import SaasUpgradeLine as UpgradeLine
11 from ..models.saas_upgrade_line_request_log import SaasUpgradeLineRequestLog as UpgradeLog
12 from ..models.saas_upgrade_line_request_log_entry import SaasUpgradeLineRequestLogEntry as UpgradeLogEntry
14_logger = logging.getLogger(__name__)
17class SaasUpgradeLineRequestLog(models.Model):
18 _name = "saas.upgrade.line.request.log"
19 _description = "Upgrade line request log"
20 _inherit = ["mail.thread", "mail.activity.mixin", "saas.provider.upgrade.util"]
21 _order = "id desc"
23 upgrade_line_id = fields.Many2one(
24 "saas.upgrade.line",
25 ondelete="cascade",
26 readonly=True,
27 )
28 type = fields.Selection(
29 [("info", "Info"), ("warning", "Warning"), ("error", "Error")],
30 default="error",
31 readonly=True,
32 )
33 first_content = fields.Text(
34 readonly=True,
35 )
36 entry_ids = fields.One2many(
37 "saas.upgrade.line.request.log.entry",
38 "log_id",
39 readonly=True,
40 help="Entries of the log",
41 )
42 adhoc_product_id = fields.Many2one(
43 related="upgrade_line_id.adhoc_product_id",
44 domain=[("parent_id", "!=", False), ("type", "=", "horizontal")],
45 )
46 ticket_id = fields.Many2one(
47 "helpdesk.ticket",
48 string="Fixing Ticket",
49 readonly=True,
50 help="Ticket where the entry is being or was fixed",
51 )
52 stage_id = fields.Many2one(
53 related="ticket_id.stage_id",
54 store=True,
55 )
56 solved = fields.Boolean(
57 compute="_compute_solved",
58 store=True,
59 readonly=False,
60 )
62 @api.depends("ticket_id.stage_id")
63 def _compute_solved(self):
64 for rec in self:
65 if rec.ticket_id: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 rec.solved = rec.ticket_id.stage_id.fold
68 @api.depends("upgrade_line_id", "type")
69 def _compute_display_name(self):
70 type_map = {
71 "error": "E",
72 "warning": "W",
73 "info": "I",
74 }
75 for rec in self:
76 ul_name = rec.upgrade_line_id.name or ""
77 msg_type = type_map.get(rec.type, "")
78 rec.display_name = f"{msg_type} - {ul_name}"
80 def write(self, vals):
81 solved = vals.get("solved")
82 if solved:
83 for rec in self.entry_ids.mapped("request_id").filtered(
84 lambda r: r.status == "error" and r.state not in ["done", "cancel"] and not r.automatic
85 ):
86 rec.action_activate()
87 return super().write(vals)
89 def action_run(self):
90 """
91 Action to run the entries of the log. Called from form view.
92 """
93 self.ensure_one()
94 return self.entry_ids.run()
96 @api.model
97 def create_new_entry(
98 self,
99 request: "UpgradeRequest",
100 upgrade_line: "UpgradeLine",
101 type: Literal["info", "warning", "error"],
102 content: str,
103 ) -> tuple["UpgradeLog", "UpgradeLogEntry"]:
104 """
105 Function used to create a new log entry.
106 If an existing and similar log is found, it will add the entry to that log.
107 If no existing and similar log is found, it will create a new log with the entry.
109 :param request: Request record
110 :param upgrade_line: Upgrade Line record
111 :param type: Type of the log entry (info, warning, error)
112 :param content: Content of the log entry
113 :return: Tuple of (log, entry)
114 """
115 # Search for similar UL and type logs
116 self = self.sudo() # Ensure we have access to all logs and entries for the upgrade line
117 logs = self.search(
118 [
119 ("upgrade_line_id", "=", upgrade_line.id),
120 ("type", "=", type),
121 ("solved", "=", False),
122 ],
123 )
125 # If exists, content similar log, it must be one
126 if similar_logs := logs.filtered(lambda log: self.similar_content(log.first_content, content)):
127 log = similar_logs[0]
128 else:
129 log = self.create(
130 {
131 "upgrade_line_id": upgrade_line.id,
132 "type": type,
133 "first_content": content,
134 }
135 )
137 if entries := log.entry_ids.filtered(lambda e: e.content == content and e.request_id == request):
138 entry = entries[0]
139 entry.write({"solved": False})
140 else:
141 entry = log.entry_ids.create(
142 {
143 "log_id": log.id,
144 "request_id": request.id,
145 "content": content,
146 "script_id": upgrade_line.actual_script_id.id,
147 }
148 )
149 return log, entry
151 @api.model
152 def log_message(
153 self,
154 upgrade_line: "UpgradeLine",
155 request: "UpgradeRequest",
156 msg: str,
157 msg_type: Literal["info", "warning"] = "info",
158 ):
159 """
160 Creates a log entry for the upgrade line in the request
162 :param upgrade_line: The upgrade line record
163 :param msg: The message to log
164 :param msg_type: The type of message ('info' or 'warning')
165 """
166 test_dev = self.env.context.get("test_dev", False)
167 if msg_type not in ["info", "warning"]:
168 raise ValueError(_("Invalid message type, it must be 'info' or 'warning'"))
169 if not test_dev:
170 self.env["saas.upgrade.line.request.log"].create_new_entry(
171 request=request,
172 upgrade_line=upgrade_line,
173 type=msg_type,
174 content=msg,
175 )
177 @api.model
178 def create_entries_from_json(self, request: "UpgradeRequest", upgrade_line: "UpgradeLine", vals_list: list[dict]):
179 """
180 Function to create a log from a list of values in JSON format.
182 :param request: The upgrade request record
183 :param upgrade_line: The upgrade line record
184 :param vals_list: List of dictionaries containing log entry values
185 """
186 if self.env.context.get("test_dev", False):
187 return
189 for vals in vals_list:
190 content = vals.get("message", "")
191 if not content:
192 continue
194 self.create_new_entry(
195 request=request,
196 upgrade_line=upgrade_line,
197 type=vals.get("type", "info"),
198 content=content,
199 )
201 @api.model
202 def _parse(self, content: str):
203 """
204 Method to parse the log and return a formatted version.
206 :param log: The log content to be parsed
207 :return: Formatted log content
208 """
209 SPLIT_WORD = "while evaluating"
210 content = content.split(SPLIT_WORD)[0]
211 content = re.sub(r"line (\d+)", r"\nline \1", content)
213 lines = content.splitlines()
214 new_lines = []
216 for line in lines:
217 flag = True
218 while flag:
219 new_line, rest = self._split_by_max_len(line)
220 new_lines += [new_line]
221 if not rest or len(rest) == 1:
222 flag = False
223 else:
224 line = rest
225 content = "\n".join(new_lines)
226 return content
228 @api.model
229 def _split_by_max_len(self, line: str) -> tuple[str, str | None]:
230 """
231 Function to split a line by the maximum length allowed for logs.
233 :param line: The line to be split
234 :return: tuple of (new_line, rest)
235 """
236 LOG_MAX_LINE_LEN = 200
237 tokens = line.split(" ")
238 new_line = ""
239 rest = None
240 for i in range(len(tokens)):
241 token = tokens[i]
242 if len(new_line) + len(token) > LOG_MAX_LINE_LEN:
243 rest = "\t" + " ".join(tokens[i:])
244 break
245 new_line += token + " "
246 return new_line, rest
248 @api.autovacuum
249 def _gc_childless_logs(self):
250 """
251 Method to delete logs without entries older than 1 day.
252 """
253 cutoff_date = fields.Datetime.now() - datetime.timedelta(days=1)
254 logs_to_delete = self.search(
255 [
256 ("create_date", "<", cutoff_date),
257 ("entry_ids", "=", False),
258 ]
259 )
260 if logs_to_delete:
261 _logger.info("Autovacuum: Deleting %d childless upgrade logs", len(logs_to_delete))
262 logs_to_delete.with_context(bypass_delete=True).unlink()
263 _logger.info("Autovacuum: Successfully deleted childless upgrade logs")