Coverage for adhoc-cicd-odoo-odoo / odoo / tools / xml_utils.py: 10%
166 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1"""Utilities for generating, parsing and checking XML/XSD files on top of the lxml.etree module."""
3import base64
4import contextlib
5import logging
6import re
7import zipfile
8from io import BytesIO
10from lxml import etree
12from odoo.exceptions import UserError
13from odoo.tools.misc import file_open
15__all__ = [
16 "cleanup_xml_node",
17 "load_xsd_files_from_url",
18 "validate_xml_from_attachment",
19]
21_logger = logging.getLogger(__name__)
24def remove_control_characters(byte_node):
25 """
26 The characters to be escaped are the control characters #x0 to #x1F and #x7F (most of which cannot appear in XML)
27 [...] XML processors must accept any character in the range specified for Char:
28 `Char :: = #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]`
29 source:https://www.w3.org/TR/xml/
30 """
31 return re.sub(
32 '[^'
33 '\u0009'
34 '\u000A'
35 '\u000D'
36 '\u0020-\uD7FF'
37 '\uE000-\uFFFD'
38 '\U00010000-\U0010FFFF'
39 ']'.encode(),
40 b'',
41 byte_node,
42 )
45class odoo_resolver(etree.Resolver):
46 """Odoo specific file resolver that can be added to the XML Parser.
48 It will search filenames in the ir.attachments
49 """
51 def __init__(self, env, prefix):
52 super().__init__()
53 self.env = env
54 self.prefix = prefix
56 def resolve(self, url, id, context):
57 """Search url in ``ir.attachment`` and return the resolved content."""
58 attachment_name = f'{self.prefix}.{url}' if self.prefix else url
59 attachment = self.env['ir.attachment'].search([('name', '=', attachment_name)])
60 if attachment:
61 return self.resolve_string(attachment.raw, context)
64def _validate_xml(env, url, path, xmls):
65 # Get the XSD data
66 xsd_attachment = env['ir.attachment']
67 if path:
68 with file_open(path, filter_ext=('.xsd',)) as file:
69 content = file.read()
70 attachment_vals = {
71 'name': path.split('/')[-1],
72 'datas': base64.b64encode(content.encode()),
73 }
74 xsd_attachment = env['ir.attachment'].create(attachment_vals)
75 elif url:
76 xsd_attachment = load_xsd_files_from_url(env, url)
78 # Validate the XML against the XSD
79 if not isinstance(xmls, list):
80 xmls = [xmls]
82 for xml in xmls:
83 validate_xml_from_attachment(env, xml, xsd_attachment.name)
84 xsd_attachment.unlink()
87def _check_with_xsd(tree_or_str, stream, env=None, prefix=None):
88 """Check an XML against an XSD schema.
90 This will raise a UserError if the XML file is not valid according to the
91 XSD file.
93 :param str | etree._Element tree_or_str: representation of the tree to be checked
94 :param io.IOBase | str stream: the byte stream used to build the XSD schema.
95 If env is given, it can also be the name of an attachment in the filestore
96 :param odoo.api.Environment env: If it is given, it enables resolving the
97 imports of the schema in the filestore with ir.attachments.
98 :param str prefix: if given, provides a prefix to try when
99 resolving the imports of the schema. e.g. prefix='l10n_cl_edi' will
100 enable 'SiiTypes_v10.xsd' to be resolved to 'l10n_cl_edi.SiiTypes_v10.xsd'.
101 """
102 if not isinstance(tree_or_str, etree._Element):
103 tree_or_str = etree.fromstring(tree_or_str)
104 parser = etree.XMLParser()
105 if env:
106 parser.resolvers.add(odoo_resolver(env, prefix))
107 if isinstance(stream, str) and stream.endswith('.xsd'):
108 attachment = env['ir.attachment'].search([('name', '=', stream)])
109 if not attachment:
110 raise FileNotFoundError()
111 stream = BytesIO(attachment.raw)
112 xsd_schema = etree.XMLSchema(etree.parse(stream, parser=parser))
113 try:
114 xsd_schema.assertValid(tree_or_str)
115 except etree.DocumentInvalid as xml_errors:
116 raise UserError('\n'.join(str(e) for e in xml_errors.error_log))
119def create_xml_node_chain(first_parent_node, nodes_list, last_node_value=None):
120 """Generate a hierarchical chain of nodes.
122 Each new node being the child of the previous one based on the tags contained
123 in `nodes_list`, under the given node `first_parent_node`.
125 :param etree._Element first_parent_node: parent of the created tree/chain
126 :param Iterable[str] nodes_list: tag names to be created
127 :param str last_node_value: if specified, set the last node's text to this value
128 :returns: the list of created nodes
129 :rtype: list[etree._Element]
130 """
131 res = []
132 current_node = first_parent_node
133 for tag in nodes_list:
134 current_node = etree.SubElement(current_node, tag)
135 res.append(current_node)
137 if last_node_value is not None:
138 current_node.text = last_node_value
139 return res
142def create_xml_node(parent_node, node_name, node_value=None):
143 """Create a new node.
145 :param etree._Element parent_node: parent of the created node
146 :param str node_name: name of the created node
147 :param str node_value: value of the created node (optional)
148 :rtype: etree._Element
149 """
150 return create_xml_node_chain(parent_node, [node_name], node_value)[0]
153def cleanup_xml_node(xml_node_or_string, remove_blank_text=True, remove_blank_nodes=True, indent_level=0, indent_space=" "):
154 """Clean up the sub-tree of the provided XML node.
156 If the provided XML node is of type:
157 - etree._Element, it is modified in-place.
158 - string/bytes, it is first parsed into an etree._Element
159 :param xml_node_or_string (etree._Element, str): XML node (or its string/bytes representation)
160 :param remove_blank_text (bool): if True, removes whitespace-only text from nodes
161 :param remove_blank_nodes (bool): if True, removes leaf nodes with no text (iterative, depth-first, done after remove_blank_text)
162 :param indent_level (int): depth or level of node within root tree (use -1 to leave indentation as-is)
163 :param indent_space (str): string to use for indentation (use '' to remove all indentation)
164 :returns (etree._Element): clean node, same instance that was received (if applicable)
165 """
166 xml_node = xml_node_or_string
168 # Convert str/bytes to etree._Element
169 if isinstance(xml_node, str):
170 xml_node = xml_node.encode() # misnomer: fromstring actually reads bytes
171 if isinstance(xml_node, bytes):
172 parser = etree.XMLParser(recover=True, resolve_entities=False)
173 xml_node = etree.fromstring(remove_control_characters(xml_node), parser=parser)
175 # Process leaf nodes iteratively
176 # Depth-first, so any inner node may become a leaf too (if children are removed)
177 def leaf_iter(parent_node, node, level):
178 for child_node in node:
179 leaf_iter(node, child_node, level if level < 0 else level + 1)
181 # Indentation
182 if level >= 0:
183 indent = '\n' + indent_space * level
184 if not node.tail or not node.tail.strip():
185 node.tail = '\n' if parent_node is None else indent
186 if len(node) > 0:
187 if not node.text or not node.text.strip():
188 # First child's indentation is parent's text
189 node.text = indent + indent_space
190 last_child = node[-1]
191 if last_child.tail == indent + indent_space:
192 # Last child's tail is parent's closing tag indentation
193 last_child.tail = indent
195 # Removal condition: node is leaf (not root nor inner node)
196 if parent_node is not None and len(node) == 0:
197 if remove_blank_text and node.text is not None and not node.text.strip():
198 # node.text is None iff node.tag is self-closing (text='' creates closing tag)
199 node.text = ''
200 if remove_blank_nodes and not (node.text or ''):
201 parent_node.remove(node)
203 leaf_iter(None, xml_node, indent_level)
204 return xml_node
207def load_xsd_files_from_url(env, url, file_name=None, force_reload=False,
208 request_max_timeout=10, xsd_name_prefix='', xsd_names_filter=None, modify_xsd_content=None):
209 """Load XSD file or ZIP archive. Save XSD files as ir.attachment.
211 An XSD attachment is saved as {xsd_name_prefix}.{filename} where the filename is either the filename obtained
212 from the URL or from the ZIP archive, or the `file_name` param if it is specified and a single XSD is being downloaded.
213 A typical prefix is the calling module's name.
215 For ZIP archives, XSD files inside it will be saved as attachments, depending on the provided list of XSD names.
216 ZIP archive themselves are not saved.
218 The XSD files content can be modified by providing the `modify_xsd_content` function as argument.
219 Typically, this is used when XSD files depend on each other (with the schemaLocation attribute),
220 but it can be used for any purpose.
222 :param odoo.api.Environment env: environment of calling module
223 :param str url: URL of XSD file/ZIP archive
224 :param str file_name: used as attachment name if the URL leads to a single XSD, otherwise ignored
225 :param bool force_reload: Deprecated.
226 :param int request_max_timeout: maximum time (in seconds) before the request times out
227 :param str xsd_name_prefix: if provided, will be added as a prefix to every XSD file name
228 :param list | str xsd_names_filter: if provided, will only save the XSD files with these names
229 :param func modify_xsd_content: function that takes the xsd content as argument and returns a modified version of it
230 :rtype: odoo.api.ir.attachment | bool
231 :return: every XSD attachment created/fetched or False if an error occurred (see warning logs)
232 """
233 import requests # noqa: PLC0415
234 try:
235 _logger.info("Fetching file/archive from given URL: %s", url)
236 response = requests.get(url, timeout=request_max_timeout)
237 response.raise_for_status()
238 except requests.exceptions.HTTPError as error:
239 _logger.warning('HTTP error: %s with the given URL: %s', error, url)
240 return False
241 except requests.exceptions.ConnectionError as error:
242 _logger.warning('Connection error: %s with the given URL: %s', error, url)
243 return False
244 except requests.exceptions.Timeout as error:
245 _logger.warning('Request timeout: %s with the given URL: %s', error, url)
246 return False
248 content = response.content
249 if not content:
250 _logger.warning("The HTTP response from %s is empty (no content)", url)
251 return False
253 archive = None
254 with contextlib.suppress(zipfile.BadZipFile):
255 archive = zipfile.ZipFile(BytesIO(content))
257 if archive is None:
258 if modify_xsd_content:
259 content = modify_xsd_content(content)
260 if not file_name:
261 file_name = f"{url.split('/')[-1]}"
262 _logger.info("XSD name not provided, defaulting to %s", file_name)
264 prefixed_xsd_name = f"{xsd_name_prefix}.{file_name}" if xsd_name_prefix else file_name
265 fetched_attachment = env['ir.attachment'].search([('name', '=', prefixed_xsd_name)], limit=1)
266 if fetched_attachment:
267 _logger.info("Updating the content of ir.attachment with name: %s", prefixed_xsd_name)
268 fetched_attachment.raw = content
269 return fetched_attachment
270 else:
271 _logger.info("Saving XSD file as ir.attachment, with name: %s", prefixed_xsd_name)
272 return env['ir.attachment'].create({
273 'name': prefixed_xsd_name,
274 'raw': content,
275 'public': True,
276 })
278 saved_attachments = env['ir.attachment']
279 for file_path in archive.namelist():
280 if not file_path.endswith('.xsd'):
281 continue
283 file_name = file_path.rsplit('/', 1)[-1]
285 if xsd_names_filter and file_name not in xsd_names_filter:
286 _logger.info("Skipping file with name %s in ZIP archive", file_name)
287 continue
289 try:
290 content = archive.read(file_path)
291 except KeyError:
292 _logger.warning("Failed to retrieve XSD file with name %s from ZIP archive", file_name)
293 continue
294 if modify_xsd_content:
295 content = modify_xsd_content(content)
297 prefixed_xsd_name = f"{xsd_name_prefix}.{file_name}" if xsd_name_prefix else file_name
298 fetched_attachment = env['ir.attachment'].search([('name', '=', prefixed_xsd_name)], limit=1)
299 if fetched_attachment:
300 _logger.info("Updating the content of ir.attachment with name: %s", prefixed_xsd_name)
301 fetched_attachment.raw = content
302 saved_attachments |= fetched_attachment
304 else:
305 _logger.info("Saving XSD file as ir.attachment, with name: %s", prefixed_xsd_name)
306 saved_attachments |= env['ir.attachment'].create({
307 'name': prefixed_xsd_name,
308 'raw': content,
309 'public': True,
310 })
312 return saved_attachments
315def validate_xml_from_attachment(env, xml_content, xsd_name, reload_files_function=None, prefix=None):
316 """Try and validate the XML content with an XSD attachment.
317 If the XSD attachment cannot be found in database, skip validation without raising.
319 :param odoo.api.Environment env: environment of calling module
320 :param xml_content: the XML content to validate
321 :param xsd_name: the XSD file name in database
322 :param reload_files_function: Deprecated.
323 :return: the result of the function :func:`odoo.tools.xml_utils._check_with_xsd`
324 """
326 prefixed_xsd_name = f"{prefix}.{xsd_name}" if prefix else xsd_name
327 try:
328 _logger.info("Validating with XSD...")
329 _check_with_xsd(xml_content, prefixed_xsd_name, env, prefix)
330 _logger.info("XSD validation successful!")
331 except FileNotFoundError:
332 _logger.info("XSD file not found, skipping validation")
333 except etree.XMLSchemaParseError as e:
334 _logger.error("XSD file not valid: ")
335 for arg in e.args:
336 _logger.error(arg)
339def find_xml_value(xpath, xml_element, namespaces=None):
340 element = xml_element.xpath(xpath, namespaces=namespaces)
341 return element[0].text if element else None