Coverage for adhoc-cicd-odoo-odoo / odoo / tools / xml_utils.py: 10%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1"""Utilities for generating, parsing and checking XML/XSD files on top of the lxml.etree module.""" 

2 

3import base64 

4import contextlib 

5import logging 

6import re 

7import zipfile 

8from io import BytesIO 

9 

10from lxml import etree 

11 

12from odoo.exceptions import UserError 

13from odoo.tools.misc import file_open 

14 

15__all__ = [ 

16 "cleanup_xml_node", 

17 "load_xsd_files_from_url", 

18 "validate_xml_from_attachment", 

19] 

20 

21_logger = logging.getLogger(__name__) 

22 

23 

24def remove_control_characters(byte_node): 

25 """ 

26 The characters to be escaped are the control characters #x0 to #x1F and #x7F (most of which cannot appear in XML) 

27 [...] XML processors must accept any character in the range specified for Char: 

28 `Char :: = #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]` 

29 source:https://www.w3.org/TR/xml/ 

30 """ 

31 return re.sub( 

32 '[^' 

33 '\u0009' 

34 '\u000A' 

35 '\u000D' 

36 '\u0020-\uD7FF' 

37 '\uE000-\uFFFD' 

38 '\U00010000-\U0010FFFF' 

39 ']'.encode(), 

40 b'', 

41 byte_node, 

42 ) 

43 

44 

45class odoo_resolver(etree.Resolver): 

46 """Odoo specific file resolver that can be added to the XML Parser. 

47 

48 It will search filenames in the ir.attachments 

49 """ 

50 

51 def __init__(self, env, prefix): 

52 super().__init__() 

53 self.env = env 

54 self.prefix = prefix 

55 

56 def resolve(self, url, id, context): 

57 """Search url in ``ir.attachment`` and return the resolved content.""" 

58 attachment_name = f'{self.prefix}.{url}' if self.prefix else url 

59 attachment = self.env['ir.attachment'].search([('name', '=', attachment_name)]) 

60 if attachment: 

61 return self.resolve_string(attachment.raw, context) 

62 

63 

64def _validate_xml(env, url, path, xmls): 

65 # Get the XSD data 

66 xsd_attachment = env['ir.attachment'] 

67 if path: 

68 with file_open(path, filter_ext=('.xsd',)) as file: 

69 content = file.read() 

70 attachment_vals = { 

71 'name': path.split('/')[-1], 

72 'datas': base64.b64encode(content.encode()), 

73 } 

74 xsd_attachment = env['ir.attachment'].create(attachment_vals) 

75 elif url: 

76 xsd_attachment = load_xsd_files_from_url(env, url) 

77 

78 # Validate the XML against the XSD 

79 if not isinstance(xmls, list): 

80 xmls = [xmls] 

81 

82 for xml in xmls: 

83 validate_xml_from_attachment(env, xml, xsd_attachment.name) 

84 xsd_attachment.unlink() 

85 

86 

87def _check_with_xsd(tree_or_str, stream, env=None, prefix=None): 

88 """Check an XML against an XSD schema. 

89 

90 This will raise a UserError if the XML file is not valid according to the 

91 XSD file. 

92 

93 :param str | etree._Element tree_or_str: representation of the tree to be checked 

94 :param io.IOBase | str stream: the byte stream used to build the XSD schema. 

95 If env is given, it can also be the name of an attachment in the filestore 

96 :param odoo.api.Environment env: If it is given, it enables resolving the 

97 imports of the schema in the filestore with ir.attachments. 

98 :param str prefix: if given, provides a prefix to try when 

99 resolving the imports of the schema. e.g. prefix='l10n_cl_edi' will 

100 enable 'SiiTypes_v10.xsd' to be resolved to 'l10n_cl_edi.SiiTypes_v10.xsd'. 

101 """ 

102 if not isinstance(tree_or_str, etree._Element): 

103 tree_or_str = etree.fromstring(tree_or_str) 

104 parser = etree.XMLParser() 

105 if env: 

106 parser.resolvers.add(odoo_resolver(env, prefix)) 

107 if isinstance(stream, str) and stream.endswith('.xsd'): 

108 attachment = env['ir.attachment'].search([('name', '=', stream)]) 

109 if not attachment: 

110 raise FileNotFoundError() 

111 stream = BytesIO(attachment.raw) 

112 xsd_schema = etree.XMLSchema(etree.parse(stream, parser=parser)) 

113 try: 

114 xsd_schema.assertValid(tree_or_str) 

115 except etree.DocumentInvalid as xml_errors: 

116 raise UserError('\n'.join(str(e) for e in xml_errors.error_log)) 

117 

118 

119def create_xml_node_chain(first_parent_node, nodes_list, last_node_value=None): 

120 """Generate a hierarchical chain of nodes. 

121 

122 Each new node being the child of the previous one based on the tags contained 

123 in `nodes_list`, under the given node `first_parent_node`. 

124 

125 :param etree._Element first_parent_node: parent of the created tree/chain 

126 :param Iterable[str] nodes_list: tag names to be created 

127 :param str last_node_value: if specified, set the last node's text to this value 

128 :returns: the list of created nodes 

129 :rtype: list[etree._Element] 

130 """ 

131 res = [] 

132 current_node = first_parent_node 

133 for tag in nodes_list: 

134 current_node = etree.SubElement(current_node, tag) 

135 res.append(current_node) 

136 

137 if last_node_value is not None: 

138 current_node.text = last_node_value 

139 return res 

140 

141 

142def create_xml_node(parent_node, node_name, node_value=None): 

143 """Create a new node. 

144 

145 :param etree._Element parent_node: parent of the created node 

146 :param str node_name: name of the created node 

147 :param str node_value: value of the created node (optional) 

148 :rtype: etree._Element 

149 """ 

150 return create_xml_node_chain(parent_node, [node_name], node_value)[0] 

151 

152 

153def cleanup_xml_node(xml_node_or_string, remove_blank_text=True, remove_blank_nodes=True, indent_level=0, indent_space=" "): 

154 """Clean up the sub-tree of the provided XML node. 

155 

156 If the provided XML node is of type: 

157 - etree._Element, it is modified in-place. 

158 - string/bytes, it is first parsed into an etree._Element 

159 :param xml_node_or_string (etree._Element, str): XML node (or its string/bytes representation) 

160 :param remove_blank_text (bool): if True, removes whitespace-only text from nodes 

161 :param remove_blank_nodes (bool): if True, removes leaf nodes with no text (iterative, depth-first, done after remove_blank_text) 

162 :param indent_level (int): depth or level of node within root tree (use -1 to leave indentation as-is) 

163 :param indent_space (str): string to use for indentation (use '' to remove all indentation) 

164 :returns (etree._Element): clean node, same instance that was received (if applicable) 

165 """ 

166 xml_node = xml_node_or_string 

167 

168 # Convert str/bytes to etree._Element 

169 if isinstance(xml_node, str): 

170 xml_node = xml_node.encode() # misnomer: fromstring actually reads bytes 

171 if isinstance(xml_node, bytes): 

172 parser = etree.XMLParser(recover=True, resolve_entities=False) 

173 xml_node = etree.fromstring(remove_control_characters(xml_node), parser=parser) 

174 

175 # Process leaf nodes iteratively 

176 # Depth-first, so any inner node may become a leaf too (if children are removed) 

177 def leaf_iter(parent_node, node, level): 

178 for child_node in node: 

179 leaf_iter(node, child_node, level if level < 0 else level + 1) 

180 

181 # Indentation 

182 if level >= 0: 

183 indent = '\n' + indent_space * level 

184 if not node.tail or not node.tail.strip(): 

185 node.tail = '\n' if parent_node is None else indent 

186 if len(node) > 0: 

187 if not node.text or not node.text.strip(): 

188 # First child's indentation is parent's text 

189 node.text = indent + indent_space 

190 last_child = node[-1] 

191 if last_child.tail == indent + indent_space: 

192 # Last child's tail is parent's closing tag indentation 

193 last_child.tail = indent 

194 

195 # Removal condition: node is leaf (not root nor inner node) 

196 if parent_node is not None and len(node) == 0: 

197 if remove_blank_text and node.text is not None and not node.text.strip(): 

198 # node.text is None iff node.tag is self-closing (text='' creates closing tag) 

199 node.text = '' 

200 if remove_blank_nodes and not (node.text or ''): 

201 parent_node.remove(node) 

202 

203 leaf_iter(None, xml_node, indent_level) 

204 return xml_node 

205 

206 

207def load_xsd_files_from_url(env, url, file_name=None, force_reload=False, 

208 request_max_timeout=10, xsd_name_prefix='', xsd_names_filter=None, modify_xsd_content=None): 

209 """Load XSD file or ZIP archive. Save XSD files as ir.attachment. 

210 

211 An XSD attachment is saved as {xsd_name_prefix}.{filename} where the filename is either the filename obtained 

212 from the URL or from the ZIP archive, or the `file_name` param if it is specified and a single XSD is being downloaded. 

213 A typical prefix is the calling module's name. 

214 

215 For ZIP archives, XSD files inside it will be saved as attachments, depending on the provided list of XSD names. 

216 ZIP archive themselves are not saved. 

217 

218 The XSD files content can be modified by providing the `modify_xsd_content` function as argument. 

219 Typically, this is used when XSD files depend on each other (with the schemaLocation attribute), 

220 but it can be used for any purpose. 

221 

222 :param odoo.api.Environment env: environment of calling module 

223 :param str url: URL of XSD file/ZIP archive 

224 :param str file_name: used as attachment name if the URL leads to a single XSD, otherwise ignored 

225 :param bool force_reload: Deprecated. 

226 :param int request_max_timeout: maximum time (in seconds) before the request times out 

227 :param str xsd_name_prefix: if provided, will be added as a prefix to every XSD file name 

228 :param list | str xsd_names_filter: if provided, will only save the XSD files with these names 

229 :param func modify_xsd_content: function that takes the xsd content as argument and returns a modified version of it 

230 :rtype: odoo.api.ir.attachment | bool 

231 :return: every XSD attachment created/fetched or False if an error occurred (see warning logs) 

232 """ 

233 import requests # noqa: PLC0415 

234 try: 

235 _logger.info("Fetching file/archive from given URL: %s", url) 

236 response = requests.get(url, timeout=request_max_timeout) 

237 response.raise_for_status() 

238 except requests.exceptions.HTTPError as error: 

239 _logger.warning('HTTP error: %s with the given URL: %s', error, url) 

240 return False 

241 except requests.exceptions.ConnectionError as error: 

242 _logger.warning('Connection error: %s with the given URL: %s', error, url) 

243 return False 

244 except requests.exceptions.Timeout as error: 

245 _logger.warning('Request timeout: %s with the given URL: %s', error, url) 

246 return False 

247 

248 content = response.content 

249 if not content: 

250 _logger.warning("The HTTP response from %s is empty (no content)", url) 

251 return False 

252 

253 archive = None 

254 with contextlib.suppress(zipfile.BadZipFile): 

255 archive = zipfile.ZipFile(BytesIO(content)) 

256 

257 if archive is None: 

258 if modify_xsd_content: 

259 content = modify_xsd_content(content) 

260 if not file_name: 

261 file_name = f"{url.split('/')[-1]}" 

262 _logger.info("XSD name not provided, defaulting to %s", file_name) 

263 

264 prefixed_xsd_name = f"{xsd_name_prefix}.{file_name}" if xsd_name_prefix else file_name 

265 fetched_attachment = env['ir.attachment'].search([('name', '=', prefixed_xsd_name)], limit=1) 

266 if fetched_attachment: 

267 _logger.info("Updating the content of ir.attachment with name: %s", prefixed_xsd_name) 

268 fetched_attachment.raw = content 

269 return fetched_attachment 

270 else: 

271 _logger.info("Saving XSD file as ir.attachment, with name: %s", prefixed_xsd_name) 

272 return env['ir.attachment'].create({ 

273 'name': prefixed_xsd_name, 

274 'raw': content, 

275 'public': True, 

276 }) 

277 

278 saved_attachments = env['ir.attachment'] 

279 for file_path in archive.namelist(): 

280 if not file_path.endswith('.xsd'): 

281 continue 

282 

283 file_name = file_path.rsplit('/', 1)[-1] 

284 

285 if xsd_names_filter and file_name not in xsd_names_filter: 

286 _logger.info("Skipping file with name %s in ZIP archive", file_name) 

287 continue 

288 

289 try: 

290 content = archive.read(file_path) 

291 except KeyError: 

292 _logger.warning("Failed to retrieve XSD file with name %s from ZIP archive", file_name) 

293 continue 

294 if modify_xsd_content: 

295 content = modify_xsd_content(content) 

296 

297 prefixed_xsd_name = f"{xsd_name_prefix}.{file_name}" if xsd_name_prefix else file_name 

298 fetched_attachment = env['ir.attachment'].search([('name', '=', prefixed_xsd_name)], limit=1) 

299 if fetched_attachment: 

300 _logger.info("Updating the content of ir.attachment with name: %s", prefixed_xsd_name) 

301 fetched_attachment.raw = content 

302 saved_attachments |= fetched_attachment 

303 

304 else: 

305 _logger.info("Saving XSD file as ir.attachment, with name: %s", prefixed_xsd_name) 

306 saved_attachments |= env['ir.attachment'].create({ 

307 'name': prefixed_xsd_name, 

308 'raw': content, 

309 'public': True, 

310 }) 

311 

312 return saved_attachments 

313 

314 

315def validate_xml_from_attachment(env, xml_content, xsd_name, reload_files_function=None, prefix=None): 

316 """Try and validate the XML content with an XSD attachment. 

317 If the XSD attachment cannot be found in database, skip validation without raising. 

318 

319 :param odoo.api.Environment env: environment of calling module 

320 :param xml_content: the XML content to validate 

321 :param xsd_name: the XSD file name in database 

322 :param reload_files_function: Deprecated. 

323 :return: the result of the function :func:`odoo.tools.xml_utils._check_with_xsd` 

324 """ 

325 

326 prefixed_xsd_name = f"{prefix}.{xsd_name}" if prefix else xsd_name 

327 try: 

328 _logger.info("Validating with XSD...") 

329 _check_with_xsd(xml_content, prefixed_xsd_name, env, prefix) 

330 _logger.info("XSD validation successful!") 

331 except FileNotFoundError: 

332 _logger.info("XSD file not found, skipping validation") 

333 except etree.XMLSchemaParseError as e: 

334 _logger.error("XSD file not valid: ") 

335 for arg in e.args: 

336 _logger.error(arg) 

337 

338 

339def find_xml_value(xpath, xml_element, namespaces=None): 

340 element = xml_element.xpath(xpath, namespaces=namespaces) 

341 return element[0].text if element else None